Compare commits

...

6 Commits

11 changed files with 2740 additions and 912 deletions
+63 -23
View File
@@ -41,6 +41,7 @@ SEVERE_SCRIPT = os.path.join(SCRIPT_DIR, "severe_weather.py")
ICE_CHECK_SCRIPT = os.path.join(SCRIPT_DIR, "check_ghiaccio.py") ICE_CHECK_SCRIPT = os.path.join(SCRIPT_DIR, "check_ghiaccio.py")
IRRIGATION_SCRIPT = os.path.join(SCRIPT_DIR, "smart_irrigation_advisor.py") IRRIGATION_SCRIPT = os.path.join(SCRIPT_DIR, "smart_irrigation_advisor.py")
SNOW_RADAR_SCRIPT = os.path.join(SCRIPT_DIR, "snow_radar.py") SNOW_RADAR_SCRIPT = os.path.join(SCRIPT_DIR, "snow_radar.py")
FOTOVOLTAICO_SCRIPT = os.path.join(SCRIPT_DIR, "fotovoltaico.py")
# FILE STATO VIAGGI # FILE STATO VIAGGI
VIAGGI_STATE_FILE = os.path.join(SCRIPT_DIR, "viaggi_attivi.json") VIAGGI_STATE_FILE = os.path.join(SCRIPT_DIR, "viaggi_attivi.json")
@@ -152,24 +153,25 @@ def get_timezone_from_coords(lat: float, lon: float) -> str:
except Exception as e: except Exception as e:
logger.warning(f"Errore timezonefinder: {e}") logger.warning(f"Errore timezonefinder: {e}")
# Fallback: stima timezone da longitudine (approssimativo) # Fallback: stima da longitudine (approssimativo). Ordine importante: gli offset
# Ogni 15 gradi = 1 ora di differenza da UTC # tipici delle Americhe (-4 NY, -7 Denver, …) rientrano in [-10, 2] e non devono
# essere classificati come Europa prima dei rami per le Americhe.
offset_hours = int(lon / 15) offset_hours = int(lon / 15)
# Mappatura approssimativa a timezone IANA if -8 <= offset_hours <= -6:
if -10 <= offset_hours <= 2: # Europa
return "Europe/Rome"
elif 3 <= offset_hours <= 5: # Medio Oriente
return "Asia/Dubai"
elif 6 <= offset_hours <= 8: # Asia centrale
return "Asia/Kolkata"
elif 9 <= offset_hours <= 11: # Asia orientale
return "Asia/Tokyo"
elif -5 <= offset_hours <= -3: # Americhe orientali
return "America/New_York"
elif -8 <= offset_hours <= -6: # Americhe occidentali
return "America/Los_Angeles" return "America/Los_Angeles"
else: if -5 <= offset_hours <= -3:
return "UTC" return "America/New_York"
if lon < -30 and offset_hours <= -9:
return "America/Los_Angeles"
if 3 <= offset_hours <= 5:
return "Asia/Dubai"
if 6 <= offset_hours <= 8:
return "Asia/Kolkata"
if 9 <= offset_hours <= 11:
return "Asia/Tokyo"
if -10 <= offset_hours <= 2:
return "Europe/Rome"
return "UTC"
def add_viaggio(chat_id: str, location: str, lat: float, lon: float, name: str, timezone: Optional[str] = None) -> None: def add_viaggio(chat_id: str, location: str, lat: float, lon: float, name: str, timezone: Optional[str] = None) -> None:
"""Aggiunge o aggiorna un viaggio attivo per un chat_id (sovrascrive se esiste)""" """Aggiunge o aggiorna un viaggio attivo per un chat_id (sovrascrive se esiste)"""
@@ -237,6 +239,7 @@ def restricted(func):
async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs): async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
user_id = update.effective_user.id user_id = update.effective_user.id
if user_id not in ALLOWED_IDS: if user_id not in ALLOWED_IDS:
logger.warning("Comando da utente non in ALLOWED_IDS: user_id=%s", user_id)
return return
return await func(update, context, *args, **kwargs) return await func(update, context, *args, **kwargs)
return wrapped return wrapped
@@ -248,7 +251,7 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
[InlineKeyboardButton("🛡️ Pi-hole", callback_data="menu_pihole"), InlineKeyboardButton("🌐 Rete", callback_data="menu_net")], [InlineKeyboardButton("🛡️ Pi-hole", callback_data="menu_pihole"), InlineKeyboardButton("🌐 Rete", callback_data="menu_net")],
[InlineKeyboardButton("🌤️ Meteo Casa", callback_data="req_meteo_home"), InlineKeyboardButton("📜 Logs", callback_data="menu_logs")] [InlineKeyboardButton("🌤️ Meteo Casa", callback_data="req_meteo_home"), InlineKeyboardButton("📜 Logs", callback_data="menu_logs")]
] ]
text = "🎛 **Loogle Control Center v8.1**\nComandi disponibili:\n🔹 `/meteo <città>`\n🔹 `/meteo7 <città>` (Previsione 7gg)\n🔹 Pulsanti sotto" text = "🎛 **Loogle Control Center v8.1**\nComandi disponibili:\n🔹 `/meteo <città>`\n🔹 `/meteo7 <città>` (Previsione 7gg)\n🔹 `/fotovoltaico` (Previsione produzione FV)\n🔹 Pulsanti sotto"
if update.message: await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown") if update.message: await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
else: await update.callback_query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown") else: await update.callback_query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
@@ -365,6 +368,18 @@ async def snowradar_command(update: Update, context: ContextTypes.DEFAULT_TYPE)
# Avvia in background # Avvia in background
subprocess.Popen(cmd, cwd=SCRIPT_DIR) subprocess.Popen(cmd, cwd=SCRIPT_DIR)
@restricted
async def fotovoltaico_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Comando /fotovoltaico: previsione produzione 72h future e curva 48h passate (ICON Italia)."""
chat_id = str(update.effective_chat.id)
cmd = ["python3", FOTOVOLTAICO_SCRIPT, "--telegram", "--chat_id", chat_id]
await update.message.reply_text(
"☀️ **Fotovoltaico**\n\n"
"Generazione curve previsione (72h future + 48h passate)... I grafici verranno inviati a breve.",
parse_mode="Markdown"
)
subprocess.Popen(cmd, cwd=SCRIPT_DIR)
@restricted @restricted
async def irrigazione_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def irrigazione_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Comando /irrigazione: consulente agronomico per gestione irrigazione""" """Comando /irrigazione: consulente agronomico per gestione irrigazione"""
@@ -642,10 +657,11 @@ async def meteo_viaggio_command(update: Update, context: ContextTypes.DEFAULT_TY
await update.message.reply_text(f"❌ Località '{location}' non trovata. Verifica il nome e riprova.", parse_mode="Markdown") await update.message.reply_text(f"❌ Località '{location}' non trovata. Verifica il nome e riprova.", parse_mode="Markdown")
return return
lat, lon, name, cc = coords lat, lon, name, cc, geo_tz = coords
# Ottieni timezone per questa localizzazione # Fuso: Open-Meteo geocoding espone già timezone IANA; altrimenti timezonefinder / fallback
timezone = get_timezone_from_coords(lat, lon) geo_tz_clean = geo_tz.strip() if isinstance(geo_tz, str) and geo_tz.strip() else ""
timezone = geo_tz_clean or get_timezone_from_coords(lat, lon)
# Conferma riconoscimento località # Conferma riconoscimento località
await update.message.reply_text( await update.message.reply_text(
@@ -741,13 +757,36 @@ async def meteo_viaggio_command(update: Update, context: ContextTypes.DEFAULT_TY
await update.message.reply_text(f"❌ Errore durante l'elaborazione: {str(e)}", parse_mode="Markdown") await update.message.reply_text(f"❌ Errore durante l'elaborazione: {str(e)}", parse_mode="Markdown")
async def scheduled_morning_report(context: ContextTypes.DEFAULT_TYPE) -> None: async def scheduled_morning_report(context: ContextTypes.DEFAULT_TYPE) -> None:
# Esegui una sola chiamata e invia il report a tutti i chat_id # Stesso comportamento di `/meteo` senza argomenti: Casa (+ viaggio se attivo) per utente.
report = call_meteo_script(["--home"]) report_casa = call_meteo_script(["--home"])
for uid in ALLOWED_IDS: for uid in ALLOWED_IDS:
chat_id = str(uid)
try: try:
await context.bot.send_message(chat_id=uid, text=report, parse_mode="Markdown") await context.bot.send_message(
chat_id=uid,
text=f"🏠 **Report Meteo - Casa**\n\n{report_casa}",
parse_mode="Markdown",
)
except Exception: except Exception:
pass pass
viaggio_attivo = get_viaggio(chat_id)
if viaggio_attivo:
report_viaggio = call_meteo_script(
[
"--query",
viaggio_attivo["location"],
"--timezone",
viaggio_attivo.get("timezone", "Europe/Rome"),
]
)
try:
await context.bot.send_message(
chat_id=uid,
text=f"✈️ **Report Meteo - {viaggio_attivo['name']}**\n\n{report_viaggio}",
parse_mode="Markdown",
)
except Exception:
pass
@restricted @restricted
async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
@@ -849,6 +888,7 @@ def main():
application.add_handler(CommandHandler("road", road_command)) application.add_handler(CommandHandler("road", road_command))
application.add_handler(CommandHandler("irrigazione", irrigazione_command)) application.add_handler(CommandHandler("irrigazione", irrigazione_command))
application.add_handler(CommandHandler("snowradar", snowradar_command)) application.add_handler(CommandHandler("snowradar", snowradar_command))
application.add_handler(CommandHandler("fotovoltaico", fotovoltaico_command))
application.add_handler(CallbackQueryHandler(button_handler)) application.add_handler(CallbackQueryHandler(button_handler))
job_queue = application.job_queue job_queue = application.job_queue
+2 -2
View File
@@ -201,11 +201,11 @@ def get_weather_data(lat, lon, model_slug, include_past_days=1):
# Modificati per ridurre falsi positivi mantenendo alta sensibilità # Modificati per ridurre falsi positivi mantenendo alta sensibilità
# ============================================================================= # =============================================================================
H_COLD_THR = 69.0 # hPa (Profondità minima strato freddo) H_COLD_THR = 69.0 # hPa (Profondità minima strato freddo)
T_COLD_THR = 0.09 # °C (Temp max al suolo considerata 'fredda') - mantenuta bassa per evitare falsi negativi T_COLD_THR = -0.5 # °C (Temp max al suolo considerata 'fredda') - abbassata da 0.09 per meno falsi positivi (solo quando è più gelido)
T_MELT_THR = 0.0 # °C (Temp min per considerare uno strato 'in fusione') - aumentata da -0.64°C a 0.0°C per ridurre falsi positivi mantenendo sensibilità T_MELT_THR = 0.0 # °C (Temp min per considerare uno strato 'in fusione') - aumentata da -0.64°C a 0.0°C per ridurre falsi positivi mantenendo sensibilità
RH_MELT_THR = 89.0 # % (Umidità relativa minima nello strato di fusione) RH_MELT_THR = 89.0 # % (Umidità relativa minima nello strato di fusione)
PR_THR_6H = 0.39 # mm/6h PR_THR_6H = 0.39 # mm/6h
PR_THR_1H = 0.1 # mm/h - aumentata da 0.065 per richiedere precipitazione più significativa PR_THR_1H = 0.25 # mm/h - alzata da 0.1 per richiedere precipitazione più significativa (meno allerte gelicidio)
# Differenza minima temperatura tra strato di fusione e suolo (per ridurre falsi positivi) # Differenza minima temperatura tra strato di fusione e suolo (per ridurre falsi positivi)
T_MELT_SURFACE_DIFF = 1.0 # °C - lo strato di fusione deve essere almeno 1°C più caldo del suolo (bilanciato tra riduzione falsi positivi e mantenimento sensibilità) T_MELT_SURFACE_DIFF = 1.0 # °C - lo strato di fusione deve essere almeno 1°C più caldo del suolo (bilanciato tra riduzione falsi positivi e mantenimento sensibilità)
@@ -43,6 +43,7 @@ TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# Zone target (come compaiono tipicamente nel testo del bollettino) # Zone target (come compaiono tipicamente nel testo del bollettino)
TARGET_ZONES = { TARGET_ZONES = {
"EMR-B2": "Costa romagnola", "EMR-B2": "Costa romagnola",
"EMR-B1": "Pianura romagnola",
"EMR-A2": "Alta collina romagnola", "EMR-A2": "Alta collina romagnola",
"EMR-D1": "Pianura bolognese", "EMR-D1": "Pianura bolognese",
} }
+816
View File
@@ -0,0 +1,816 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Fotovoltaico: analisi e previsione produzione con modello ICON Italia (ItaliaMeteo ARPAE).
- Previsione 72h future (Weather Forecast API)
- Previsione teorica 48h passate (Historical Forecast API) per confronto con produzione reale
- Produzione reale (opzionale) da API SolarEdge Monitoring
- Grafici per colpo d'occhio
Configurazione impianto:
- Inverter: 6 kW (SolarEdge)
- 22 pannelli Longi Solar (415 Wp ciascuno, totale 9,13 kWp)
- Orientamenti: vedi PANEL_GROUPS
SolarEdge: imposta SOLAREDGE_API_KEY e SOLAREDGE_SITE_ID (env o file .env / ~/.solaredge_fotovoltaico)
per mostrare la produzione reale sul grafico 48h passate.
"""
from __future__ import annotations
import argparse
import glob
import logging
import os
import sys
from datetime import datetime, timedelta
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple
import requests
from zoneinfo import ZoneInfo
# Grafici
import matplotlib
matplotlib.use("Agg")
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
from open_meteo_client import open_meteo_get
# -----------------------------------------------------------------------------
# Configurazione
# -----------------------------------------------------------------------------
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
TZ = "Europe/Rome"
TZINFO = ZoneInfo(TZ)
# Casa (stessa di meteo.py)
HOME_LAT = 43.9356
HOME_LON = 12.4296
# Impianto
INVERTER_KW = 6.0
PANEL_WP = 415.0 # Longi Solar, 415 Wp per modulo
NUM_PANELS = 22
P_PEAK_TOTAL_W = NUM_PANELS * PANEL_WP # 9130 Wp = 9,13 kWp
# Fisica termica (PVLib-style): i pannelli producono di più quando fa freddo (Wp @ 25°C).
TEMP_COEFF_POWER = -0.0035 # Longi Hi-MO ~ -0.35%/°C
NOCT = 45.0 # Nominal Operating Cell Temperature (°C)
SYSTEM_LOSSES = 0.10 # 10% perdite fisse (cavi, inverter, sporco) → PR eff. ~0.90
# Correzione empirica: impostabile via env PRODUCTION_CORRECTION_FACTOR o file.
# Se previsti < reali in modo sistematico, provare 1.21.35 (es. in ~/.solaredge_fotovoltaico).
PRODUCTION_CORRECTION_FACTOR = 1.0
# Gruppi pannelli: (tilt_deg, azimuth_compass_reale_deg, numero_pannelli).
# Azimut = bussola reale osservata (0=N, 90=E, 180=S, 270=W). Non usare i valori di monitoring.solaredge (spesso sbagliati).
# 1.1.1 a 72°; 1.1.2-10 a 158°; 1.2.1-2 a 253°; 1.2.3-10 + 1.2.12 a 72°; 1.2.11 a 190°. Due gruppi a 72° accorpati in uno.
PANEL_GROUPS = [
(34, 72, 10), # 1.1.1 + 1.2.3-10 + 1.2.12 (Est)
(34, 158, 9), # 1.1.2 - 1.1.10 (Sud-Sud-Est)
(34, 253, 2), # 1.2.1, 1.2.2 (Ovest-Sud-Ovest)
(34, 190, 1), # 1.2.11 (Sud-Sud-Ovest)
]
# API Open-Meteo: usiamo solo global_tilted_irradiance (W/m²) con &tilt= e &azimuth=.
# È l'irradianza reale sul piano del pannello (diretta+diffusa inclinate); non servono
# direct_radiation/diffuse_radiation separati. shortwave_radiation=GHI orizzontale non usato.
FORECAST_URL = "https://api.open-meteo.com/v1/forecast"
HISTORICAL_FORECAST_URL = "https://historical-forecast-api.open-meteo.com/v1/forecast"
MODEL_ICON_ITALIA = "italia_meteo_arpae_icon_2i"
MODEL_AROME_HD = "meteofrance_arome_france_hd" # Francia + limitrofi; fuori area può non restituire dati
HTTP_HEADERS = {"User-Agent": "loogle-bot-fotovoltaico/2.0"}
# Telegram
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# SolarEdge Monitoring API (opzionale: produzione reale sul grafico 48h)
# Chiave da https://monitoring.solaredge.com (Account → API Access); Site ID dal portale
SOLAREDGE_BASE = "https://monitoringapi.solaredge.com"
SOLAREDGE_SITE_ENERGY_PATH = "/site/{site_id}/energy"
SOLAREDGE_SITE_POWER_PATH = "/site/{site_id}/power"
DEFAULT_SOLAREDGE_SITE_ID = "3079750" # Usato se SOLAREDGE_SITE_ID non è impostato
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
def get_production_correction_factor() -> float:
"""Fattore di correzione produzione (default 1.0). Env: PRODUCTION_CORRECTION_FACTOR."""
s = (os.environ.get("PRODUCTION_CORRECTION_FACTOR") or "").strip()
if s:
try:
return max(0.1, min(3.0, float(s)))
except ValueError:
pass
for path in (
os.path.expanduser("~/.solaredge_fotovoltaico"),
os.path.join(SCRIPT_DIR, ".env"),
):
if not os.path.isfile(path):
continue
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("PRODUCTION_CORRECTION_FACTOR="):
v = line.split("=", 1)[1].strip().strip("'\"")
if v:
return max(0.1, min(3.0, float(v)))
except Exception:
pass
return PRODUCTION_CORRECTION_FACTOR
def compass_to_open_meteo_azimuth(compass_deg: float) -> float:
"""Converte azimut compass (0=N, 90=E, 180=S, 270=W) in Open-Meteo (0=S, -90=E, 90=W, 180=N). Formula corretta: compass - 180 (non 180 - compass, altrimenti Est/Ovest si invertono)."""
return compass_deg - 180.0
def calculate_pv_power(gti: float, temp_air: float, n_panels: int) -> float:
"""Potenza DC (kW) del gruppo con fisica termica: freddo = più efficienza rispetto a STC (25°C)."""
if gti <= 0:
return 0.0
t_cell = temp_air + (gti / 800.0) * (NOCT - 20.0)
delta_t = t_cell - 25.0
temp_factor = 1.0 + (TEMP_COEFF_POWER * delta_t)
p_peak_group_kw = (n_panels * PANEL_WP) / 1000.0
p_dc = p_peak_group_kw * (gti / 1000.0) * temp_factor * (1.0 - SYSTEM_LOSSES)
return max(0.0, p_dc)
def fetch_gti_forecast(
lat: float,
lon: float,
tilt: float,
azimuth_om: float,
forecast_hours: int = 72,
past_hours: int = 0,
model: str = MODEL_ICON_ITALIA,
) -> Optional[Dict[str, Any]]:
"""Recupera GTI e temperature_2m dalla Weather Forecast API (modello specificabile)."""
params = {
"latitude": lat,
"longitude": lon,
"timezone": "auto",
"models": model,
"hourly": "global_tilted_irradiance,temperature_2m",
"tilt": tilt,
"azimuth": azimuth_om,
"forecast_hours": forecast_hours,
"past_hours": past_hours,
}
try:
r = open_meteo_get(
FORECAST_URL,
params=params,
headers=HTTP_HEADERS,
timeout=(8, 30),
)
if r.status_code != 200:
logger.warning("Forecast API status %s: %s", r.status_code, r.text[:300])
return None
return r.json()
except Exception as e:
logger.exception("Forecast API error: %s", e)
return None
def fetch_gti_historical(
lat: float,
lon: float,
tilt: float,
azimuth_om: float,
start_date: str,
end_date: str,
) -> Optional[Dict[str, Any]]:
"""Recupera GTI e temperature_2m dalla Historical Forecast API (previsione teorica passata)."""
params = {
"latitude": lat,
"longitude": lon,
"start_date": start_date,
"end_date": end_date,
"timezone": "auto",
"models": MODEL_ICON_ITALIA,
"hourly": "global_tilted_irradiance,temperature_2m",
"tilt": tilt,
"azimuth": azimuth_om,
}
try:
r = open_meteo_get(
HISTORICAL_FORECAST_URL,
params=params,
headers=HTTP_HEADERS,
timeout=(8, 45),
)
if r.status_code != 200:
logger.warning("Historical Forecast API status %s: %s", r.status_code, r.text[:300])
return None
return r.json()
except Exception as e:
logger.exception("Historical Forecast API error: %s", e)
return None
def production_from_groups(
hourly_times: List[str],
gti_per_group: List[List[Optional[float]]],
temp_per_hour: List[Optional[float]],
panel_counts: List[int],
) -> Tuple[List[float], List[float]]:
"""
Calcola la produzione per ora: ogni gruppo con GTI + temperatura aria; fisica termica
(freddo = più efficienza). Produzione totale = SOMMA gruppi, cap inverter, poi fattore correzione.
"""
total_panels = sum(panel_counts)
if total_panels != NUM_PANELS:
logger.warning(
"Somma pannelli per gruppo = %s (attesi %s); verifica PANEL_GROUPS",
total_panels, NUM_PANELS,
)
n = len(hourly_times)
gti_equivalent = []
power_kw = []
correction = get_production_correction_factor()
for i in range(n):
temp = temp_per_hour[i] if i < len(temp_per_hour) and temp_per_hour[i] is not None else 15.0
p_dc_total = 0.0
gti_sum_weighted = 0.0
for gti_list, n_panels in zip(gti_per_group, panel_counts):
if gti_list is not None and i < len(gti_list) and gti_list[i] is not None:
gti_w = gti_list[i]
p_dc_total += calculate_pv_power(gti_w, temp, n_panels)
gti_sum_weighted += gti_w * (n_panels / total_panels)
p_ac_kw = min(p_dc_total, INVERTER_KW) * correction
power_kw.append(round(p_ac_kw, 3))
gti_equivalent.append(gti_sum_weighted)
return gti_equivalent, power_kw
def fetch_forecast_72h(lat: float, lon: float) -> Optional[Tuple[List[str], List[float], List[float]]]:
"""
Ottiene previsione 72h come curve intere: oggi, domani, dopodomani (sempre 3 giorni pieni).
Indipendentemente dall'ora di chiamata, si richiedono dati da mezzanotte di oggi per 72 ore.
"""
now = datetime.now(TZINFO)
midnight_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
past_hours = int((now - midnight_today).total_seconds() / 3600)
# Richiedi da mezzanotte (past_hours) + 72h future
total_hours = past_hours + 72
gti_per_orientation = []
panel_weights = []
times_ref = None
temp_per_hour = None
for tilt, azimuth_compass, count in PANEL_GROUPS:
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
data = fetch_gti_forecast(
lat, lon, tilt, az_om,
forecast_hours=72,
past_hours=past_hours,
)
if not data or "hourly" not in data:
logger.warning("Forecast mancante per tilt=%s az=%s", tilt, azimuth_compass)
gti_per_orientation.append([None] * total_hours)
else:
if times_ref is None:
times_ref = data["hourly"].get("time", [])
raw = data["hourly"].get("temperature_2m")
temp_per_hour = raw if raw else [15.0] * len(times_ref)
gti = data["hourly"].get("global_tilted_irradiance")
if not gti:
gti = data["hourly"].get("global_tilted_irradiance_sum")
gti_per_orientation.append(gti or [None] * total_hours)
panel_weights.append(count)
times = times_ref or []
if not times:
return None
if temp_per_hour is None:
temp_per_hour = [15.0] * len(times)
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
# Filtra: solo le 72 ore da mezzanotte di oggi (oggi, domani, dopodomani interi)
end_window = midnight_today + timedelta(hours=72)
dt_list = parse_times_to_datetime(times)
filtered_times = []
filtered_gti = []
filtered_power = []
for i, dt in enumerate(dt_list):
if midnight_today <= dt < end_window and len(filtered_times) < 72:
filtered_times.append(times[i])
filtered_gti.append(gti_w[i] if i < len(gti_w) else 0)
filtered_power.append(power_kw[i] if i < len(power_kw) else 0)
if not filtered_times:
return None
return filtered_times, filtered_gti, filtered_power
def _fetch_forecast_72h_for_model(
lat: float, lon: float, model: str
) -> Optional[Tuple[List[str], List[float]]]:
"""
Come fetch_forecast_72h ma per un singolo modello; ritorna (filtered_times, filtered_power)
o None se il modello non restituisce dati (es. AROME HD fuori copertura).
"""
now = datetime.now(TZINFO)
midnight_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
past_hours = int((now - midnight_today).total_seconds() / 3600)
total_hours = past_hours + 72
gti_per_orientation = []
panel_weights = []
times_ref = None
temp_per_hour = None
for tilt, azimuth_compass, count in PANEL_GROUPS:
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
data = fetch_gti_forecast(
lat, lon, tilt, az_om,
forecast_hours=72,
past_hours=past_hours,
model=model,
)
if not data or "hourly" not in data:
logger.warning("Forecast (%s) mancante per tilt=%s az=%s", model, tilt, azimuth_compass)
gti_per_orientation.append([None] * total_hours)
else:
if times_ref is None:
times_ref = data["hourly"].get("time", [])
raw = data["hourly"].get("temperature_2m")
temp_per_hour = raw if raw else [15.0] * len(times_ref)
gti = data["hourly"].get("global_tilted_irradiance")
if not gti:
gti = data["hourly"].get("global_tilted_irradiance_sum")
gti_per_orientation.append(gti or [None] * total_hours)
panel_weights.append(count)
times = times_ref or []
if not times:
return None
if temp_per_hour is None:
temp_per_hour = [15.0] * len(times)
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
end_window = midnight_today + timedelta(hours=72)
dt_list = parse_times_to_datetime(times)
filtered_times = []
filtered_power = []
for i, dt in enumerate(dt_list):
if midnight_today <= dt < end_window and len(filtered_times) < 72:
filtered_times.append(times[i])
filtered_power.append(power_kw[i] if i < len(power_kw) else 0)
if not filtered_times:
return None
return filtered_times, filtered_power
def fetch_forecast_72h_multi(
lat: float, lon: float
) -> Optional[Tuple[List[str], List[Tuple[str, List[float]]]]]:
"""
Previsione 72h da più modelli (ICON Italia + AROME HD se disponibile).
Ritorna (times_72, [(label, power_list), ...]). AROME HD copre Francia e limitrofi;
fuori area può non restituire dati e viene omesso.
"""
result_icon = _fetch_forecast_72h_for_model(lat, lon, MODEL_ICON_ITALIA)
if not result_icon:
return None
ref_times, power_icon = result_icon
n = len(ref_times)
series: List[Tuple[str, List[float]]] = [("ICON Italia", power_icon)]
result_arome = _fetch_forecast_72h_for_model(lat, lon, MODEL_AROME_HD)
if result_arome:
times_arome, power_arome = result_arome
# Allinea alla griglia oraria di ref_times (AROME può avere meno ore, es. 48)
power_aligned = []
for t in ref_times:
if t in times_arome:
idx = times_arome.index(t)
power_aligned.append(power_arome[idx] if idx < len(power_arome) else 0.0)
else:
power_aligned.append(0.0)
# Includi AROME solo se ha dati utili (fuori copertura restituisce spesso tutti zero)
if sum(power_aligned) >= 0.1:
series.append(("AROME HD", power_aligned))
logger.info("AROME HD disponibile per confronto previsioni")
else:
logger.info("AROME HD senza dati utili per questa località (copertura Francia/limitrofi)")
else:
logger.info("AROME HD non disponibile per questa località (copertura Francia/limitrofi)")
return ref_times, series
def fetch_historical_48h(lat: float, lon: float) -> Optional[Tuple[List[str], List[float], List[float]]]:
"""
Ottiene previsione teorica per ieri l'altro e ieri (2 giorni pieni, senza oggi).
Usa orientamenti reali (bussola osservata) e temperature_2m per la fisica termica.
Finestra: solo ieri l'altro + ieri, così il grafico "Ultime 48h" non include la giornata odierna.
"""
today = datetime.now(TZINFO).date()
start_d = datetime.combine(today - timedelta(days=2), datetime.min.time()).replace(tzinfo=TZINFO)
end_d = datetime.combine(today - timedelta(days=1), datetime.min.time()).replace(tzinfo=TZINFO)
start_date = start_d.strftime("%Y-%m-%d")
end_date = end_d.strftime("%Y-%m-%d")
gti_per_orientation = []
panel_weights = []
times_ref = None
temp_per_hour = None
for tilt, azimuth_compass, count in PANEL_GROUPS:
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
data = fetch_gti_historical(lat, lon, tilt, az_om, start_date, end_date)
if not data or "hourly" not in data:
logger.warning("Historical mancante per tilt=%s az=%s", tilt, azimuth_compass)
gti_per_orientation.append([])
else:
if times_ref is None:
times_ref = data["hourly"].get("time", [])
raw = data["hourly"].get("temperature_2m")
temp_per_hour = raw if raw else [15.0] * len(times_ref)
gti = data["hourly"].get("global_tilted_irradiance")
if not gti:
gti = data["hourly"].get("global_tilted_irradiance_sum")
gti_per_orientation.append(gti or [])
panel_weights.append(count)
times = times_ref or []
if not times or not gti_per_orientation:
return None
if temp_per_hour is None:
temp_per_hour = [15.0] * len(times)
n = len(times)
for i, gti_list in enumerate(gti_per_orientation):
if len(gti_list) < n:
gti_per_orientation[i] = gti_list + [None] * (n - len(gti_list))
elif len(gti_list) > n:
gti_per_orientation[i] = gti_list[:n]
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
return times, gti_w, power_kw
def load_solaredge_config() -> Tuple[str, str]:
"""Ritorna (api_key, site_id). site_id usa DEFAULT_SOLAREDGE_SITE_ID se non impostato."""
api_key = (os.environ.get("SOLAREDGE_API_KEY") or "").strip()
site_id = (os.environ.get("SOLAREDGE_SITE_ID") or "").strip() or DEFAULT_SOLAREDGE_SITE_ID
if api_key and site_id:
return api_key, site_id
for path in (
os.path.expanduser("~/.solaredge_fotovoltaico"),
os.path.join(SCRIPT_DIR, ".env"),
):
if not os.path.isfile(path):
continue
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, v = line.split("=", 1)
k, v = k.strip(), v.strip().strip('"\'')
if k == "SOLAREDGE_API_KEY":
api_key = api_key or v
elif k == "SOLAREDGE_SITE_ID":
site_id = site_id or v
except Exception as e:
logger.debug("Lettura %s: %s", path, e)
if api_key and site_id:
break
return api_key or "", site_id or DEFAULT_SOLAREDGE_SITE_ID
def fetch_solaredge_energy_48h(api_key: str, site_id: str) -> Optional[Tuple[List[datetime], List[float]]]:
"""Recupera produzione reale per ieri l'altro e ieri (stessa finestra del grafico 48h, senza oggi)."""
today = datetime.now(TZINFO).date()
start_date = (today - timedelta(days=2)).strftime("%Y-%m-%d")
end_date = (today - timedelta(days=1)).strftime("%Y-%m-%d")
url = SOLAREDGE_BASE + SOLAREDGE_SITE_ENERGY_PATH.format(site_id=site_id)
params = {
"api_key": api_key,
"startDate": start_date,
"endDate": end_date,
"timeUnit": "HOUR",
}
try:
r = requests.get(url, params=params, headers=HTTP_HEADERS, timeout=(8, 25))
if r.status_code != 200:
logger.warning("SolarEdge API status %s: %s", r.status_code, r.text[:300])
return None
data = r.json()
energy_block = data.get("energy", {}) or data.get("siteEnergy", {})
values = energy_block.get("values") or data.get("values")
if not values:
logger.warning("SolarEdge: nessun 'energy.values' in risposta")
return None
times_out: List[datetime] = []
power_kw_out: List[float] = []
for item in values:
val = item.get("value")
if val is None:
continue
date_str = item.get("date") or item.get("time") or ""
try:
if "T" in date_str or " " in date_str:
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00").strip())
else:
dt = datetime.strptime(date_str[:10], "%Y-%m-%d").replace(tzinfo=TZINFO)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=TZINFO)
except Exception:
continue
power_kw = float(val) / 1000.0
times_out.append(dt)
power_kw_out.append(round(power_kw, 3))
if not times_out:
return None
return times_out, power_kw_out
except Exception as e:
logger.exception("SolarEdge API error: %s", e)
return None
def kwh_per_day_from_series(
times: List[str],
power_kw: List[float],
) -> List[Tuple[str, float]]:
"""Raggruppa power_kw (kWh/ora) per data e ritorna [(data dd/mm, kwh), ...]."""
from collections import defaultdict
dt_list = parse_times_to_datetime(times)
by_date: Dict[str, float] = defaultdict(float)
for i, dt in enumerate(dt_list):
if i < len(power_kw):
key = dt.strftime("%Y-%m-%d")
by_date[key] += power_kw[i]
out: List[Tuple[str, float]] = []
for d_str in sorted(by_date.keys()):
d = datetime.strptime(d_str, "%Y-%m-%d").date()
label = d.strftime("%d/%m")
out.append((label, round(by_date[d_str], 1)))
return out
def parse_times_to_datetime(times: List[str]) -> List[datetime]:
out = []
for t in times:
try:
if "T" in t:
dt = datetime.fromisoformat(t.replace("Z", "+00:00"))
else:
dt = datetime.strptime(t, "%Y-%m-%d").replace(tzinfo=TZINFO)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=TZINFO)
out.append(dt)
except Exception:
out.append(datetime.now(TZINFO))
return out
def plot_past_48h(
times: List[str],
power_kw: List[float],
title_suffix: str = "",
real_times: Optional[List[datetime]] = None,
real_power_kw: Optional[List[float]] = None,
) -> bytes:
"""Genera il grafico in memoria e ritorna i byte PNG (non salva su disco)."""
fig, ax = plt.subplots(figsize=(10, 4.5))
dt_list = parse_times_to_datetime(times)
ax.fill_between(dt_list, 0, power_kw, alpha=0.5, color="steelblue")
ax.plot(dt_list, power_kw, color="navy", linewidth=1.2, label="Previsione (ICON Italia)")
if real_times and real_power_kw and len(real_times) == len(real_power_kw):
ax.plot(real_times, real_power_kw, color="darkorange", linewidth=1.4, label="Produzione reale (SolarEdge)")
ax.axhline(y=INVERTER_KW, color="red", linestyle="--", alpha=0.7, label=f"Limite inverter {INVERTER_KW} kW")
ax.set_ylabel("Potenza AC (kW)")
ax.set_xlabel("Data / Ora")
ax.set_title(f"Produzione Ieri l'altro e Ieri{title_suffix}")
ax.legend(loc="upper right")
all_vals = list(power_kw) + (list(real_power_kw) if real_power_kw else [])
ax.set_ylim(0, max(INVERTER_KW * 1.05, max(all_vals) * 1.1) if all_vals else 7)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d/%m %H:%M", tz=TZINFO))
ax.xaxis.set_major_locator(mdates.HourLocator(interval=6))
plt.xticks(rotation=25)
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format="png", dpi=120)
plt.close()
return buf.getvalue()
def plot_future_72h(
times: List[str],
power_series: List[Tuple[str, List[float]]],
title_suffix: str = "",
) -> bytes:
"""
Genera il grafico in memoria. power_series: [(label, power_kw_list), ...];
la prima serie ha anche fill_between, le altre solo linea (confronto modelli).
"""
if not power_series:
plt.close("all")
return b""
fig, ax = plt.subplots(figsize=(10, 4.5))
dt_list = parse_times_to_datetime(times)
colors = ["green", "blue", "purple"]
all_vals = []
for i, (label, power_kw) in enumerate(power_series):
if len(power_kw) != len(dt_list):
power_kw = (power_kw + [0.0] * len(dt_list))[: len(dt_list)]
all_vals.extend(power_kw)
color = colors[i % len(colors)]
if i == 0:
ax.fill_between(dt_list, 0, power_kw, alpha=0.4, color=color)
ax.plot(dt_list, power_kw, color=color, linewidth=1.2, label=label)
ax.axhline(y=INVERTER_KW, color="red", linestyle="--", alpha=0.7, label=f"Limite inverter {INVERTER_KW} kW")
ax.set_ylabel("Potenza AC (kW)")
ax.set_xlabel("Data / Ora")
ax.set_title(f"Previsione produzione Oggi, Domani, Dopodomani{title_suffix}")
ax.legend(loc="upper right")
ax.set_ylim(0, max(INVERTER_KW * 1.05, max(all_vals) * 1.1) if all_vals else 7)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d/%m %H:%M", tz=TZINFO))
ax.xaxis.set_major_locator(mdates.HourLocator(interval=6))
plt.xticks(rotation=25)
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format="png", dpi=120)
plt.close()
return buf.getvalue()
def load_bot_token() -> str:
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip() or os.environ.get("BOT_TOKEN", "").strip()
if tok:
return tok
for path in (TOKEN_FILE_HOME, TOKEN_FILE_ETC):
try:
with open(path, "r", encoding="utf-8") as f:
t = f.read().strip()
if t:
return t
except FileNotFoundError:
continue
return ""
def telegram_send_photo(photo_bytes: bytes, caption: str, chat_id: str) -> bool:
"""Invia una foto a Telegram da bytes (nessun file su disco)."""
token = load_bot_token()
if not token:
logger.warning("Token Telegram mancante")
return False
url = f"https://api.telegram.org/bot{token}/sendPhoto"
try:
r = requests.post(
url,
data={"chat_id": chat_id, "caption": caption, "parse_mode": "Markdown"},
files={"photo": ("grafico.png", photo_bytes, "image/png")},
timeout=20,
)
if r.status_code != 200:
logger.error("Telegram sendPhoto %s: %s", r.status_code, r.text[:400])
return False
return True
except Exception as e:
logger.exception("Telegram sendPhoto: %s", e)
return False
def telegram_send_message(text: str, chat_id: str) -> bool:
token = load_bot_token()
if not token:
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
try:
r = requests.post(
url,
json={"chat_id": chat_id, "text": text, "parse_mode": "Markdown"},
timeout=15,
)
return r.status_code == 200
except Exception as e:
logger.exception("Telegram sendMessage: %s", e)
return False
def main() -> int:
parser = argparse.ArgumentParser(description="Previsione e analisi produzione fotovoltaico (ICON Italia)")
parser.add_argument("--lat", type=float, default=HOME_LAT, help="Latitudine")
parser.add_argument("--lon", type=float, default=HOME_LON, help="Longitudine")
parser.add_argument("--telegram", action="store_true", help="Invia grafici e messaggio a Telegram")
parser.add_argument("--chat_id", type=str, default="", help="Chat ID per invio Telegram")
parser.add_argument("--no-past", action="store_true", help="Salta grafico 48h passate")
parser.add_argument("--no-future", action="store_true", help="Salta grafico 72h future")
args = parser.parse_args()
lat, lon = args.lat, args.lon
send_telegram = args.telegram and args.chat_id.strip()
# Rimuovi eventuali grafici fotovoltaico salvati in precedenza (non ne salviamo più)
for old in glob.glob(os.path.join(SCRIPT_DIR, "fotovoltaico_*.png")):
try:
os.remove(old)
logger.info("Rimosso %s", old)
except OSError as e:
logger.warning("Impossibile rimuovere %s: %s", old, e)
hist: Optional[Tuple[List[str], List[float], List[float]]] = None
fore: Optional[Tuple[List[str], List[float], List[float]]] = None
solaredge_api_key, solaredge_site_id = load_solaredge_config()
real_48h: Optional[Tuple[List[datetime], List[float]]] = None
if solaredge_api_key and solaredge_site_id and not args.no_past:
logger.info("Recupero produzione reale SolarEdge (48h)...")
real_48h = fetch_solaredge_energy_48h(solaredge_api_key, solaredge_site_id)
if not real_48h:
logger.warning("SolarEdge: dati 48h non disponibili (controlla API key e Site ID)")
past_ok = False
if not args.no_past:
logger.info(
"Pannelli: %s gruppi (tilt, azimut°, n) = %s",
len(PANEL_GROUPS),
[(t, a, n) for t, a, n in PANEL_GROUPS],
)
logger.info("Recupero dati Historical Forecast (48h passate)...")
hist = fetch_historical_48h(lat, lon)
if hist:
times_past, _, power_past = hist
past_days_debug = kwh_per_day_from_series(hist[0], hist[2])
logger.info("Historical 48h kWh per giorno (previsione): %s", past_days_debug)
real_t = (real_48h[0], real_48h[1]) if real_48h else (None, None)
img_past = plot_past_48h(times_past, power_past, real_times=real_t[0], real_power_kw=real_t[1])
past_ok = True
if send_telegram:
caption = "📊 *Produzione Ieri l'altro e Ieri* (ICON Italia" + (" + SolarEdge reale" if real_48h else "") + ")"
telegram_send_photo(img_past, caption, args.chat_id)
else:
logger.warning("Nessun dato Historical Forecast per le 48h passate")
future_ok = False
fore_multi: Optional[Tuple[List[str], List[Tuple[str, List[float]]]]] = None
if not args.no_future:
logger.info("Recupero dati Forecast (72h future, ICON + AROME HD)...")
fore_multi = fetch_forecast_72h_multi(lat, lon)
if fore_multi:
times_fut, power_series = fore_multi
img_fut = plot_future_72h(times_fut, power_series)
future_ok = True
if send_telegram:
models_label = " / ".join(s[0] for s in power_series)
telegram_send_photo(
img_fut,
f"☀️ *Previsione Oggi, Domani, Dopodomani* ({models_label})",
args.chat_id,
)
else:
logger.warning("Nessun dato Forecast per le 72h future")
if send_telegram:
lines = ["🖥 *Fotovoltaico*", ""]
if past_ok and hist:
past_days = kwh_per_day_from_series(hist[0], hist[2])
real_days = None
if real_48h:
real_days = kwh_per_day_from_series(
[t.isoformat() for t in real_48h[0]],
real_48h[1],
)
real_by_date = {d: k for d, k in (real_days or [])}
lines.append("📊 *Ultimi 2 giorni*")
for label, kwh in past_days:
part = f" {label} · prev. {kwh} kWh"
if label in real_by_date:
part += f" · _reale_ {real_by_date[label]} kWh"
lines.append(part)
lines.append("")
if future_ok and fore_multi:
times_fut, power_series = fore_multi
fut_days_per_model = [
(name, kwh_per_day_from_series(times_fut, power_list))
for name, power_list in power_series
]
lines.append("☀️ *Prossimi 3 giorni*")
if len(fut_days_per_model) == 1:
for label, kwh in fut_days_per_model[0][1]:
lines.append(f" {label} · {kwh} kWh")
else:
dates = sorted({d for _, days_list in fut_days_per_model for d, _ in days_list})
for d in dates:
parts = [f" {d}"]
for name, days_list in fut_days_per_model:
val = next((k for lbl, k in days_list if lbl == d), None)
if val is not None:
short = "ICON" if "ICON" in name else "AROME"
parts.append(f"{short} {val}")
lines.append(" · ".join(parts) + " kWh")
lines.append("")
models_footer = " · ".join(s[0] for s in power_series)
lines.append(f"_Modello: {models_footer}_")
telegram_send_message("\n".join(lines), args.chat_id)
if not past_ok and not future_ok:
logger.error("Nessun dato disponibile")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
+17 -1
View File
@@ -14,6 +14,15 @@ import requests
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DEFAULT_PATTERNS = ["*.log", "*_log.txt"] DEFAULT_PATTERNS = ["*.log", "*_log.txt"]
EXCLUDED_FILES = {
"circondario_log.txt",
"irrigation_cron.log",
"road_weather.log",
"snow_radar.log",
}
# Log irrigazione: aggiornato solo quando lo script viene eseguito (cron --auto o /irrigazione).
# Se non c’è un cron giornaliero, il file può restare “non aggiornato” per giorni (normale in inverno).
STALE_EXCLUDE_BASENAMES = {"irrigation_advisor.log"}
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
@@ -21,7 +30,8 @@ TS_RE = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})")
CATEGORIES = { CATEGORIES = {
"open_meteo_timeout": re.compile( "open_meteo_timeout": re.compile(
r"timeout|timed out|Read timed out|Gateway Time-out|504", re.IGNORECASE r"timeout|timed out|Read timed out|Gateway Time-out|HTTP\s*504|status\s*504|\b504\b",
re.IGNORECASE,
), ),
"ssl_handshake": re.compile(r"handshake", re.IGNORECASE), "ssl_handshake": re.compile(r"handshake", re.IGNORECASE),
"permission_error": re.compile(r"PermissionError|permesso negato|Errno 13", re.IGNORECASE), "permission_error": re.compile(r"PermissionError|permesso negato|Errno 13", re.IGNORECASE),
@@ -119,6 +129,8 @@ def analyze_logs(files: List[str], since: datetime.datetime, max_lines: int) ->
break break
# Verifica se il log è "stale" (non aggiornato da più di 24 ore) # Verifica se il log è "stale" (non aggiornato da più di 24 ore)
if os.path.basename(path) in STALE_EXCLUDE_BASENAMES:
continue # Non segnalare come stale (es. irrigazione: aggiornato solo a ogni run)
if last_ts: if last_ts:
hours_since = (now - last_ts).total_seconds() / 3600.0 hours_since = (now - last_ts).total_seconds() / 3600.0
if hours_since > 24: if hours_since > 24:
@@ -207,6 +219,9 @@ def main():
parser.add_argument("--log", action="append", help="Aggiungi un file log specifico") parser.add_argument("--log", action="append", help="Aggiungi un file log specifico")
args = parser.parse_args() args = parser.parse_args()
run_ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"--- Log Monitor run {run_ts} ---")
if args.log: if args.log:
files = [p for p in args.log if os.path.exists(p)] files = [p for p in args.log if os.path.exists(p)]
else: else:
@@ -215,6 +230,7 @@ def main():
for pat in DEFAULT_PATTERNS: for pat in DEFAULT_PATTERNS:
files.extend(sorted([str(p) for p in Path(BASE_DIR).glob(pat)])) files.extend(sorted([str(p) for p in Path(BASE_DIR).glob(pat)]))
files = sorted(set(files)) files = sorted(set(files))
files = [p for p in files if os.path.basename(p) not in EXCLUDED_FILES]
since = datetime.datetime.now() - datetime.timedelta(days=args.days) since = datetime.datetime.now() - datetime.timedelta(days=args.days)
category_hits, per_file_counts, timeout_minutes, stale_logs = analyze_logs(files, since, args.max_lines) category_hits, per_file_counts, timeout_minutes, stale_logs = analyze_logs(files, since, args.max_lines)
+71 -84
View File
@@ -18,14 +18,10 @@ logger = logging.getLogger(__name__)
# --- CONFIGURAZIONE METEO --- # --- CONFIGURAZIONE METEO ---
HOME_LAT = 43.9356 HOME_LAT = 43.9356
HOME_LON = 12.4296 HOME_LON = 12.4296
HOME_NAME = "🏠 Casa (Wide View ±12km)" HOME_NAME = "🏠 Casa"
TZ = "Europe/Berlin" TZ = "Europe/Berlin"
TZINFO = ZoneInfo(TZ) TZINFO = ZoneInfo(TZ)
# Offset ~12-15km per i 5 punti
OFFSET_LAT = 0.12
OFFSET_LON = 0.16
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast" OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
GEOCODING_URL = "https://geocoding-api.open-meteo.com/v1/search" GEOCODING_URL = "https://geocoding-api.open-meteo.com/v1/search"
HTTP_HEADERS = {"User-Agent": "loogle-bot-v10.5"} HTTP_HEADERS = {"User-Agent": "loogle-bot-v10.5"}
@@ -94,14 +90,17 @@ def telegram_send_markdown(message_md: str, chat_ids: Optional[List[str]] = None
def now_local() -> datetime.datetime: def now_local() -> datetime.datetime:
return datetime.datetime.now(TZINFO) return datetime.datetime.now(TZINFO)
def parse_time(t: str) -> datetime.datetime: def parse_time(t: str, tz: Optional[ZoneInfo] = None) -> datetime.datetime:
"""Interpreta un timestamp ISO dell'API nel fuso richiesto (default: Casa / Europe/Berlin)."""
target = tz if tz is not None else TZINFO
try: try:
dt = date_parser.isoparse(t) dt = date_parser.isoparse(t)
if dt.tzinfo is None: return dt.replace(tzinfo=TZINFO) if dt.tzinfo is None:
return dt.astimezone(TZINFO) return dt.replace(tzinfo=target)
return dt.astimezone(target)
except Exception as e: except Exception as e:
logger.error(f"Time parse error: {e}") logger.error(f"Time parse error: {e}")
return now_local() return datetime.datetime.now(target)
def degrees_to_cardinal(d: int) -> str: def degrees_to_cardinal(d: int) -> str:
dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
@@ -168,7 +167,8 @@ def get_coordinates(city_name: str):
res = data["results"][0] res = data["results"][0]
cc = res.get("country_code", "IT").upper() cc = res.get("country_code", "IT").upper()
name = f"{res.get('name')} ({cc})" name = f"{res.get('name')} ({cc})"
return res["latitude"], res["longitude"], name, cc geo_tz = res.get("timezone")
return res["latitude"], res["longitude"], name, cc, geo_tz
except Exception as e: except Exception as e:
logger.error(f"Geocoding error: {e}") logger.error(f"Geocoding error: {e}")
return None return None
@@ -176,12 +176,12 @@ def get_coordinates(city_name: str):
def choose_best_model(lat, lon, cc, is_home=False): def choose_best_model(lat, lon, cc, is_home=False):
""" """
Sceglie il modello meteo. Sceglie il modello meteo.
- Per Casa: usa AROME Seamless (ha snowfall) - Per Casa: usa ICON Italia (ARPAE 2i) - migliore risoluzione spaziale per Italia/San Marino.
- Per altre località: usa best match di Open-Meteo (senza specificare models) - Per altre località: usa best match di Open-Meteo (senza specificare models)
""" """
if is_home: if is_home:
# Per Casa, usa AROME Seamless (ha snowfall e dati dettagliati) # Per Casa, usa ICON Italia (risoluzione spaziale migliore per Italia/San Marino)
return "meteofrance_seamless", "AROME HD" return "italia_meteo_arpae_icon_2i", "ICON Italia"
else: else:
# Per query worldwide, usa best match (Open-Meteo sceglie automaticamente) # Per query worldwide, usa best match (Open-Meteo sceglie automaticamente)
return None, "Best Match" return None, "Best Match"
@@ -189,7 +189,7 @@ def choose_best_model(lat, lon, cc, is_home=False):
def get_forecast(lat, lon, model=None, is_home=False, timezone=None, retry_after_60s=False): def get_forecast(lat, lon, model=None, is_home=False, timezone=None, retry_after_60s=False):
""" """
Recupera forecast. Se model è None, usa best match di Open-Meteo. Recupera forecast. Se model è None, usa best match di Open-Meteo.
Per Casa (is_home=True), usa AROME Seamless. Per Casa (is_home=True), usa ICON Italia.
Args: Args:
retry_after_60s: Se True, attende 10 secondi prima di riprovare (per retry) retry_after_60s: Se True, attende 10 secondi prima di riprovare (per retry)
@@ -202,21 +202,15 @@ def get_forecast(lat, lon, model=None, is_home=False, timezone=None, retry_after
logger.info("Attendo 10 secondi prima del retry...") logger.info("Attendo 10 secondi prima del retry...")
time.sleep(10) time.sleep(10)
# Generiamo 5 punti: Centro, N, S, E, W # Singola coordinata: cielo sopra il punto richiesto (casa o località).
lats = [lat, lat + OFFSET_LAT, lat - OFFSET_LAT, lat, lat]
lons = [lon, lon, lon, lon + OFFSET_LON, lon - OFFSET_LON]
lat_str = ",".join(map(str, lats))
lon_str = ",".join(map(str, lons))
params = { params = {
"latitude": lat_str, "longitude": lon_str, "timezone": tz_to_use, "latitude": lat, "longitude": lon, "timezone": tz_to_use,
"forecast_days": 3, "forecast_days": 3,
"wind_speed_unit": "kmh", "precipitation_unit": "mm", "wind_speed_unit": "kmh", "precipitation_unit": "mm",
"hourly": "temperature_2m,apparent_temperature,relative_humidity_2m,cloud_cover,cloud_cover_low,cloud_cover_mid,cloud_cover_high,windspeed_10m,winddirection_10m,windgusts_10m,precipitation,rain,snowfall,weathercode,is_day,cape,visibility,uv_index" "hourly": "temperature_2m,apparent_temperature,relative_humidity_2m,cloud_cover,cloud_cover_low,cloud_cover_mid,cloud_cover_high,windspeed_10m,winddirection_10m,windgusts_10m,precipitation,rain,showers,snowfall,weathercode,is_day,cape,visibility,uv_index"
} }
# Aggiungi models solo se specificato (per Casa usa AROME, per altre località best match) # Aggiungi models solo se specificato (per Casa usa ICON Italia, per altre località best match)
if model: if model:
params["models"] = model params["models"] = model
@@ -241,11 +235,7 @@ def get_forecast(lat, lon, model=None, is_home=False, timezone=None, retry_after
logger.error(f"API Error {error_details}") logger.error(f"API Error {error_details}")
return None, error_details # Restituisce anche i dettagli dell'errore return None, error_details # Restituisce anche i dettagli dell'errore
response_data = r.json() response_data = r.json()
logger.info("get_forecast ok model=%s points=5 elapsed=%.2fs", model or "best_match", time.time() - t0) logger.info("get_forecast ok model=%s elapsed=%.2fs", model or "best_match", time.time() - t0)
# Open-Meteo per multiple locations (lat/lon separati da virgola) restituisce
# direttamente un dict con "hourly", "daily", etc. che contiene liste di valori
# per ogni location. Per semplicità, restituiamo il dict così com'è
# e lo gestiamo nel codice chiamante
return response_data, None return response_data, None
except requests.exceptions.Timeout as e: except requests.exceptions.Timeout as e:
error_details = f"Timeout dopo 20s: {str(e)}" error_details = f"Timeout dopo 20s: {str(e)}"
@@ -315,23 +305,23 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
# Determina se è Casa # Determina se è Casa
is_home = (abs(lat - HOME_LAT) < 0.01 and abs(lon - HOME_LON) < 0.01) is_home = (abs(lat - HOME_LAT) < 0.01 and abs(lon - HOME_LON) < 0.01)
# Usa timezone personalizzata se fornita, altrimenti default # Fuso per l'API: Casa = TZ; località = timezone esplicito/geocoding, altrimenti "auto" (Open-Meteo risolve da lat/lon)
tz_to_use = timezone if timezone else TZ tz_for_api = timezone if timezone else (TZ if is_home else "auto")
model_id, model_name = choose_best_model(lat, lon, cc, is_home=is_home) model_id, model_name = choose_best_model(lat, lon, cc, is_home=is_home)
# Tentativo 1: Richiesta iniziale # Tentativo 1: Richiesta iniziale
data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_to_use, retry_after_60s=False) data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_for_api, retry_after_60s=False)
# Se fallisce e siamo a Casa con AROME, prova retry dopo 10 secondi # Se fallisce e siamo a Casa con ICON Italia, prova retry dopo 10 secondi
if not data_list and is_home and model_id == "meteofrance_seamless": if not data_list and is_home and model_id == "italia_meteo_arpae_icon_2i":
logger.warning(f"Primo tentativo AROME fallito: {error_details}. Retry dopo 10 secondi...") logger.warning(f"Primo tentativo ICON Italia fallito: {error_details}. Retry dopo 10 secondi...")
data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_to_use, retry_after_60s=True) data_list, error_details = get_forecast(lat, lon, model_id, is_home=is_home, timezone=tz_for_api, retry_after_60s=True)
# Se ancora fallisce e siamo a Casa, fallback a best match # Se ancora fallisce e siamo a Casa, fallback a best match
if not data_list and is_home: if not data_list and is_home:
logger.warning(f"AROME fallito anche dopo retry: {error_details}. Fallback a best match...") logger.warning(f"ICON Italia fallito anche dopo retry: {error_details}. Fallback a best match...")
data_list, error_details = get_forecast(lat, lon, None, is_home=False, timezone=tz_to_use, retry_after_60s=False) data_list, error_details = get_forecast(lat, lon, None, is_home=False, timezone=tz_for_api, retry_after_60s=False)
if data_list: if data_list:
model_name = "Best Match (fallback)" model_name = "Best Match (fallback)"
logger.info("Fallback a best match riuscito") logger.info("Fallback a best match riuscito")
@@ -345,20 +335,33 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
if not isinstance(data_list, list): data_list = [data_list] if not isinstance(data_list, list): data_list = [data_list]
# Punto centrale (Casa) per dati specifici
data_center = data_list[0] data_center = data_list[0]
# Ora e giorno LT: fuso della località (non San Marino se non è Casa)
if timezone is not None:
tz_to_use = timezone
elif is_home:
tz_to_use = TZ
else:
tz_to_use = data_center.get("timezone") or TZ
try:
tz_to_use_info = ZoneInfo(tz_to_use)
except Exception:
tz_to_use_info = TZINFO
tz_to_use = TZ
hourly_c = data_center.get("hourly", {}) hourly_c = data_center.get("hourly", {})
times = hourly_c.get("time", []) times = hourly_c.get("time", [])
if not times: return "❌ Dati orari mancanti." if not times: return "❌ Dati orari mancanti."
L = len(times) L = len(times)
# --- DATI LOCALI (CASA) --- # --- DATI LOCALI ---
l_temp = safe_get_list(hourly_c, "temperature_2m", L, 0) l_temp = safe_get_list(hourly_c, "temperature_2m", L, 0)
l_app = safe_get_list(hourly_c, "apparent_temperature", L, 0) l_app = safe_get_list(hourly_c, "apparent_temperature", L, 0)
l_rh = safe_get_list(hourly_c, "relative_humidity_2m", L, 50) l_rh = safe_get_list(hourly_c, "relative_humidity_2m", L, 50)
l_prec = safe_get_list(hourly_c, "precipitation", L, 0) l_prec = safe_get_list(hourly_c, "precipitation", L, 0)
l_rain = safe_get_list(hourly_c, "rain", L, 0) l_rain = safe_get_list(hourly_c, "rain", L, 0)
l_showers = safe_get_list(hourly_c, "showers", L, 0)
l_snow = safe_get_list(hourly_c, "snowfall", L, 0) l_snow = safe_get_list(hourly_c, "snowfall", L, 0)
l_wspd = safe_get_list(hourly_c, "windspeed_10m", L, 0) l_wspd = safe_get_list(hourly_c, "windspeed_10m", L, 0)
l_gust = safe_get_list(hourly_c, "windgusts_10m", L, 0) l_gust = safe_get_list(hourly_c, "windgusts_10m", L, 0)
@@ -369,8 +372,8 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
l_vis = safe_get_list(hourly_c, "visibility", L, 10000) l_vis = safe_get_list(hourly_c, "visibility", L, 10000)
l_uv = safe_get_list(hourly_c, "uv_index", L, 0) l_uv = safe_get_list(hourly_c, "uv_index", L, 0)
# Se è Casa e AROME non fornisce visibilità (tutti None), recuperala da best match # Se è Casa e ICON Italia non fornisce visibilità (tutti None), recuperala da best match
if is_home and model_id == "meteofrance_seamless": if is_home and model_id == "italia_meteo_arpae_icon_2i":
vis_check = [v for v in l_vis if v is not None] vis_check = [v for v in l_vis if v is not None]
if not vis_check: # Tutti None, recupera da best match if not vis_check: # Tutti None, recupera da best match
vis_data = get_visibility_forecast(lat, lon) vis_data = get_visibility_forecast(lat, lon)
@@ -383,34 +386,22 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
l_cl_mid_loc = safe_get_list(hourly_c, "cloud_cover_mid", L, 0) l_cl_mid_loc = safe_get_list(hourly_c, "cloud_cover_mid", L, 0)
l_cl_hig_loc = safe_get_list(hourly_c, "cloud_cover_high", L, 0) l_cl_hig_loc = safe_get_list(hourly_c, "cloud_cover_high", L, 0)
# --- DATI GLOBALI (MEDIA 5 PUNTI) --- # Nuvolosità (stesso punto della località)
acc_cl_tot = [0.0] * L avg_cl_tot = []
points_cl_tot = [ [] for _ in range(L) ] for i in range(L):
cc = get_val(l_cl_tot_loc[i], 0)
for d in data_list: cl = get_val(l_cl_low_loc[i], 0)
h = d.get("hourly", {}) cm = get_val(l_cl_mid_loc[i], 0)
for i in range(L): ch = get_val(l_cl_hig_loc[i], 0)
cc = get_val(safe_get_list(h, "cloud_cover", L)[i]) avg_cl_tot.append(max(cc, cl, cm, ch))
cl = get_val(safe_get_list(h, "cloud_cover_low", L)[i])
cm = get_val(safe_get_list(h, "cloud_cover_mid", L)[i])
ch = get_val(safe_get_list(h, "cloud_cover_high", L)[i])
# Calcolo robusto del totale per singolo punto
real_point_total = max(cc, cl, cm, ch)
acc_cl_tot[i] += real_point_total
points_cl_tot[i].append(real_point_total)
num_points = len(data_list)
avg_cl_tot = [x / num_points for x in acc_cl_tot]
# --- DEBUG MODE --- # --- DEBUG MODE ---
if debug_mode: if debug_mode:
output = f"🔍 **DEBUG METEO (v10.5)**\n" output = f"🔍 **DEBUG METEO (v10.5)**\n"
now_h = now_local().replace(minute=0, second=0, microsecond=0) now_h = datetime.datetime.now(tz_to_use_info).replace(minute=0, second=0, microsecond=0)
idx = 0 idx = 0
for i, t_str in enumerate(times): for i, t_str in enumerate(times):
if parse_time(t_str) >= now_h: if parse_time(t_str, tz_to_use_info) >= now_h:
idx = i idx = i
break break
@@ -419,9 +410,9 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
loc_H = get_val(l_cl_hig_loc[idx]) loc_H = get_val(l_cl_hig_loc[idx])
code_now = int(get_val(l_code[idx])) code_now = int(get_val(l_code[idx]))
output += f"Ora: {parse_time(times[idx]).strftime('%H:%M')}\n" output += f"Ora: {parse_time(times[idx], tz_to_use_info).strftime('%H:%M')} (LT)\n"
output += f"📍 **LOCALE (Casa)**: L:{int(loc_L)}% | H:{int(loc_H)}%\n" output += f"📍 **LOCALE**: L:{int(loc_L)}% | H:{int(loc_H)}%\n"
output += f"🌍 **MEDIA GLOBALE**: {int(avg_cl_tot[idx])}%\n" output += f"☁️ **Nv%**: {int(avg_cl_tot[idx])}%\n"
output += f"❄️ **NEVE**: Codice={code_now}, Accumulo={get_val(l_snow[idx])}cm\n" output += f"❄️ **NEVE**: Codice={code_now}, Accumulo={get_val(l_snow[idx])}cm\n"
decision = "H" decision = "H"
@@ -430,8 +421,6 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
return output return output
# --- GENERAZIONE TABELLA --- # --- GENERAZIONE TABELLA ---
# Usa timezone personalizzata se fornita
tz_to_use_info = ZoneInfo(tz_to_use) if tz_to_use else TZINFO
now_local_tz = datetime.datetime.now(tz_to_use_info) now_local_tz = datetime.datetime.now(tz_to_use_info)
# Inizia dall'ora corrente (arrotondata all'ora) # Inizia dall'ora corrente (arrotondata all'ora)
@@ -444,12 +433,7 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
valid_indices = [] valid_indices = []
for i, t_str in enumerate(times): for i, t_str in enumerate(times):
try: try:
dt = parse_time(t_str) dt = parse_time(t_str, tz_to_use_info)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=tz_to_use_info)
else:
dt = dt.astimezone(tz_to_use_info)
# Include solo timestamp >= current_hour e < end_hour # Include solo timestamp >= current_hour e < end_hour
if current_hour <= dt < end_hour: if current_hour <= dt < end_hour:
valid_indices.append((i, dt)) valid_indices.append((i, dt))
@@ -521,17 +505,19 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
Sn = get_val(l_snow[idx], 0) Sn = get_val(l_snow[idx], 0)
Code = int(get_val(l_code[idx], 0)) Code = int(get_val(l_code[idx], 0))
Rain = get_val(l_rain[idx], 0) Rain = get_val(l_rain[idx], 0)
Showers = get_val(l_showers[idx], 0) if idx < len(l_showers) else 0
# Per modelli che espongono rain+showers (es. ICON Italia), usa il totale se precipitation è assente/zero
Pr_display = max(Pr, Rain + Showers)
# Determina se è neve # Determina se è neve
is_snowing = Sn > 0 or (Code in [71, 73, 75, 77, 85, 86]) is_snowing = Sn > 0 or (Code in [71, 73, 75, 77, 85, 86])
# Formattazione MM # Formattazione MM: 0 se nulla, altrimenti il valore (i modelli danno 0 o il valore orario)
p_suffix = "" p_suffix = ""
if Code in [96, 99]: p_suffix = "G" if Code in [96, 99]: p_suffix = "G"
elif Code in [66, 67]: p_suffix = "Z" elif Code in [66, 67]: p_suffix = "Z"
elif is_snowing and Pr >= 0.2: p_suffix = "N" elif is_snowing and Pr_display > 0: p_suffix = "N"
p_s = "0" if Pr_display <= 0 else f"{int(round(Pr_display))}{p_suffix}"
p_s = "--" if Pr < 0.2 else f"{int(round(Pr))}{p_suffix}"
# --- CLOUD LOGIC --- # --- CLOUD LOGIC ---
Cl = int(get_val(l_cl_tot_loc[idx], 0)) Cl = int(get_val(l_cl_tot_loc[idx], 0))
@@ -587,10 +573,10 @@ def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT",
w_fmt = f"{w_txt:<5}" w_fmt = f"{w_txt:<5}"
# --- ICONE --- # --- ICONE ---
sky, sgx = get_icon_set(Pr, Sn, Code, IsDay, Cl, Vis, T, Rain, Gust, Cape, dominant_type) sky, sgx = get_icon_set(Pr_display, Sn, Code, IsDay, Cl, Vis, T, Rain, Gust, Cape, dominant_type)
# Se c'è precipitazione nevosa, mostra ❄️ nella colonna Sk (invece di 🌨️) # Se c'è precipitazione nevosa, mostra ❄️ nella colonna Sk (invece di 🌨️)
if is_snowing and Pr >= 0.2: if is_snowing and Pr_display > 0:
sky = "❄️" sky = "❄️"
sky_fmt = f"{sky}{uv_suffix}" sky_fmt = f"{sky}{uv_suffix}"
@@ -615,7 +601,7 @@ if __name__ == "__main__":
args_parser = argparse.ArgumentParser() args_parser = argparse.ArgumentParser()
args_parser.add_argument("--query", help="Nome città") args_parser.add_argument("--query", help="Nome città")
args_parser.add_argument("--home", action="store_true", help="Usa coordinate casa") args_parser.add_argument("--home", action="store_true", help="Usa coordinate casa")
args_parser.add_argument("--debug", action="store_true", help="Mostra i valori dei 5 punti") args_parser.add_argument("--debug", action="store_true", help="Mostra dettaglio debug (nuvole, neve)")
args_parser.add_argument("--chat_id", help="Chat ID Telegram per invio diretto (opzionale, può essere multiplo separato da virgola)") args_parser.add_argument("--chat_id", help="Chat ID Telegram per invio diretto (opzionale, può essere multiplo separato da virgola)")
args_parser.add_argument("--timezone", help="Timezone IANA (es: Europe/Rome, America/New_York)") args_parser.add_argument("--timezone", help="Timezone IANA (es: Europe/Rome, America/New_York)")
args = args_parser.parse_args() args = args_parser.parse_args()
@@ -632,8 +618,9 @@ if __name__ == "__main__":
elif args.query: elif args.query:
coords = get_coordinates(args.query) coords = get_coordinates(args.query)
if coords: if coords:
lat, lon, name, cc = coords lat, lon, name, cc, geo_tz = coords
report = generate_weather_report(lat, lon, name, args.debug, cc) tz = args.timezone or geo_tz
report = generate_weather_report(lat, lon, name, args.debug, cc, timezone=tz)
else: else:
error_msg = f"❌ Città '{args.query}' non trovata." error_msg = f"❌ Città '{args.query}' non trovata."
if chat_ids: if chat_ids:
+18 -6
View File
@@ -11,7 +11,7 @@ from typing import List, Optional
# --- CONFIGURAZIONE --- # --- CONFIGURAZIONE ---
BOT_TOKEN="8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4" BOT_TOKEN="8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4"
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"] TELEGRAM_CHAT_IDS = ["64463169"]
# BERSAGLIO (Cloudflare è solitamente il più stabile per i ping) # BERSAGLIO (Cloudflare è solitamente il più stabile per i ping)
TARGET_HOST = "1.1.1.1" TARGET_HOST = "1.1.1.1"
@@ -86,8 +86,20 @@ def measure_quality(chat_ids: Optional[List[str]] = None):
# Parsing Packet Loss # Parsing Packet Loss
# Cerca pattern: "X% packet loss" # Cerca pattern: "X% packet loss"
loss_match = re.search(r'(\d+)% packet loss', output) loss_match = re.search(r'([0-9]+(?:[\\.,][0-9]+)?)% packet loss', output)
loss = int(loss_match.group(1)) if loss_match else 100 if loss_match:
loss_raw = loss_match.group(1).replace(",", ".")
try:
loss = float(loss_raw)
except Exception:
loss = 100.0
else:
loss = 100.0
# Clamp to avoid parsing artifacts (e.g., "0.96078%" -> 0.96078, not 96078).
if loss < 0:
loss = 0.0
if loss > 100:
loss = 100.0
# Parsing Jitter (mdev) # Parsing Jitter (mdev)
# Output tipico: rtt min/avg/max/mdev = 10.1/12.5/40.2/5.1 ms # Output tipico: rtt min/avg/max/mdev = 10.1/12.5/40.2/5.1 ms
@@ -101,7 +113,7 @@ def measure_quality(chat_ids: Optional[List[str]] = None):
else: else:
avg_ping = 0.0 avg_ping = 0.0
result_line = f"Risultati: Loss={loss}% | Jitter={jitter}ms | AvgPing={avg_ping}ms" result_line = f"Risultati: Loss={loss:.2f}% | Jitter={jitter}ms | AvgPing={avg_ping}ms"
print(result_line) print(result_line)
log_line(f"INFO {result_line}") log_line(f"INFO {result_line}")
@@ -116,7 +128,7 @@ def measure_quality(chat_ids: Optional[List[str]] = None):
# NUOVO ALLARME # NUOVO ALLARME
msg = f"📉 **DEGRADO QUALITÀ LINEA**\n\n" msg = f"📉 **DEGRADO QUALITÀ LINEA**\n\n"
if loss >= LIMIT_LOSS: if loss >= LIMIT_LOSS:
msg += f"🔴 **Packet Loss:** `{loss}%` (Soglia {LIMIT_LOSS}%)\n" msg += f"🔴 **Packet Loss:** `{loss:.2f}%` (Soglia {LIMIT_LOSS}%)\n"
if jitter >= LIMIT_JITTER: if jitter >= LIMIT_JITTER:
msg += f"⚠️ **Jitter (Instabilità):** `{jitter}ms` (Soglia {LIMIT_JITTER}ms)\n" msg += f"⚠️ **Jitter (Instabilità):** `{jitter}ms` (Soglia {LIMIT_JITTER}ms)\n"
@@ -133,7 +145,7 @@ def measure_quality(chat_ids: Optional[List[str]] = None):
# RECOVERY # RECOVERY
msg = f"✅ **QUALITÀ LINEA RIPRISTINATA**\n\n" msg = f"✅ **QUALITÀ LINEA RIPRISTINATA**\n\n"
msg += f"I parametri sono rientrati nella norma.\n" msg += f"I parametri sono rientrati nella norma.\n"
msg += f"Ping: `{avg_ping}ms` | Jitter: `{jitter}ms` | Loss: `{loss}%`" msg += f"Ping: `{avg_ping}ms` | Jitter: `{jitter}ms` | Loss: `{loss:.2f}%`"
send_telegram(msg, chat_ids=chat_ids) send_telegram(msg, chat_ids=chat_ids)
save_state(False) save_state(False)
print("Recovery inviata.") print("Recovery inviata.")
+338 -225
View File
@@ -47,6 +47,7 @@ MODEL_NAMES = {
"icon_d2": "ICON-D2", "icon_d2": "ICON-D2",
"gfs_global": "GFS", "gfs_global": "GFS",
"ecmwf_ifs04": "ECMWF", "ecmwf_ifs04": "ECMWF",
"ecmwf_ifs": "ECMWF IFS",
"jma_msm": "JMA MSM", "jma_msm": "JMA MSM",
"metno_nordic": "Yr.no", "metno_nordic": "Yr.no",
"ukmo_global": "UK MetOffice", "ukmo_global": "UK MetOffice",
@@ -54,29 +55,31 @@ MODEL_NAMES = {
"italia_meteo_arpae_icon_2i": "ICON Italia (ARPAE 2i)" "italia_meteo_arpae_icon_2i": "ICON Italia (ARPAE 2i)"
} }
# Per Casa/Italia: forecast_days per modello lungo termine (come Agent Irrigazione / OPEN_METEO_MODELS.md)
LONG_TERM_FORECAST_DAYS = {
"italia_meteo_arpae_icon_2i": 10,
"ecmwf_ifs": 10,
"meteofrance_seamless": 4,
}
def choose_models_by_country(cc, is_home=False): def choose_models_by_country(cc, is_home=False):
""" """
Seleziona modelli meteo ottimali. Seleziona modelli meteo ottimali.
- Per Casa: usa AROME Seamless e ICON-D2 (alta risoluzione) - Per Casa e Italia: 0-2d mediana ICON Italia + AROME HD; 3-10d mediana ICON Italia + ECMWF IFS + ARPEGE.
- Per Italia: usa italia_meteo_arpae_icon_2i (include snow_depth quando > 0) - Per altre località: best match Open-Meteo.
- Per altre località: usa best match di Open-Meteo (senza specificare models)
Ritorna (short_term_models, long_term_models) Ritorna (short_term_models, long_term_models)
""" """
cc = cc.upper() if cc else "UNKNOWN" cc = cc.upper() if cc else "UNKNOWN"
# Modelli a lungo termine (sempre globali, funzionano ovunque)
long_term_default = ["gfs_global", "ecmwf_ifs04"] long_term_default = ["gfs_global", "ecmwf_ifs04"]
if is_home: if is_home or cc == "IT":
# Per Casa, usa AROME Seamless, ICON-D2 e ICON Italia (alta risoluzione europea) # 0-2d: due modelli ad alta risoluzione (mediana). 3-10d: tre modelli (mediana, come Irrigazione).
# ICON Italia include snow_depth quando disponibile (> 0) return (
return ["meteofrance_seamless", "icon_d2", "italia_meteo_arpae_icon_2i"], long_term_default ["italia_meteo_arpae_icon_2i", "meteofrance_arome_france_hd"],
elif cc == "IT": ["italia_meteo_arpae_icon_2i", "ecmwf_ifs", "meteofrance_seamless"],
# Per Italia, usa ICON Italia (ARPAE 2i) che include snow_depth quando disponibile )
return ["italia_meteo_arpae_icon_2i"], long_term_default
else: else:
# Per query worldwide, usa best match (Open-Meteo sceglie automaticamente)
# Ritorna None per indicare best match
return None, long_term_default return None, long_term_default
def get_bot_token(): def get_bot_token():
@@ -127,9 +130,9 @@ def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forec
url = "https://api.open-meteo.com/v1/forecast" url = "https://api.open-meteo.com/v1/forecast"
params = { params = {
"latitude": lat, "longitude": lon, "latitude": lat, "longitude": lon,
"hourly": "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm", "hourly": "temperature_2m,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max", "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max",
"timezone": timezone if timezone else TZ_STR, "forecast_days": min(forecast_days, 3) # Limita a 3 giorni per modelli ad alta risoluzione "timezone": timezone if timezone else TZ_STR, "forecast_days": min(forecast_days, 3)
} }
try: try:
resp = open_meteo_get(url, params=params, timeout=(5, 20)) resp = open_meteo_get(url, params=params, timeout=(5, 20))
@@ -160,17 +163,28 @@ def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forec
except: except:
results["best_match"] = None results["best_match"] = None
else: else:
# Modelli specifici (per Casa: AROME + ICON, per Italia: ICON ARPAE) # Modelli specifici: 0-2d ICON Italia + AROME HD (mediana); AROME HD solo 2 giorni
for model in short_term_models: for model in short_term_models:
url = "https://api.open-meteo.com/v1/forecast" url = "https://api.open-meteo.com/v1/forecast"
# Per italia_meteo_arpae_icon_2i, includi sempre snow_depth (supportato quando > 0) if model == "italia_meteo_arpae_icon_2i":
hourly_params = "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm" hourly_params = "rain,showers,snowfall,snow_depth,precipitation,temperature_2m,weathercode,windspeed_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,showers_sum,rain_sum,snowfall_sum,precipitation_sum,precipitation_hours,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
fd_short = min(forecast_days, 7)
elif model == "meteofrance_arome_france_hd":
# AROME HD: 2 giorni, set variabili ridotto (no snow_depth/showers in output)
hourly_params = "temperature_2m,precipitation,snowfall,rain,weathercode,windspeed_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
fd_short = 2
else:
hourly_params = "temperature_2m,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm"
daily_params = "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
fd_short = min(forecast_days, 3)
params = { params = {
"latitude": lat, "longitude": lon, "latitude": lat, "longitude": lon,
"hourly": hourly_params, "hourly": hourly_params,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max", "daily": daily_params,
"timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": min(forecast_days, 3) # Limita a 3 giorni per modelli ad alta risoluzione "timezone": timezone if timezone else TZ_STR, "models": model,
"forecast_days": fd_short
} }
try: try:
resp = open_meteo_get(url, params=params, timeout=(5, 20)) resp = open_meteo_get(url, params=params, timeout=(5, 20))
@@ -220,14 +234,25 @@ def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forec
except: except:
results[model] = None results[model] = None
# Recupera modelli a lungo termine (globale, fino a 10 giorni) # Recupera modelli a lungo termine (3-10d): tre modelli per mediana (come Agent Irrigazione)
for model in long_term_models: for model in (long_term_models or []):
url = "https://api.open-meteo.com/v1/forecast" url = "https://api.open-meteo.com/v1/forecast"
fd_long = LONG_TERM_FORECAST_DAYS.get(model, forecast_days)
if model == "ecmwf_ifs":
hourly_params = "rain,showers,snowfall,precipitation,temperature_2m,weathercode,windspeed_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,showers_sum,rain_sum,snowfall_sum,precipitation_sum,precipitation_hours,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
elif model == "meteofrance_seamless":
hourly_params = "temperature_2m,precipitation,snowfall,rain,weathercode,windspeed_10m,windgusts_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,snowfall_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
else:
# ICON Italia e altri
hourly_params = "rain,showers,snowfall,snow_depth,precipitation,temperature_2m,weathercode,windspeed_10m,winddirection_10m"
daily_params = "temperature_2m_max,temperature_2m_min,showers_sum,rain_sum,snowfall_sum,precipitation_sum,precipitation_hours,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max"
params = { params = {
"latitude": lat, "longitude": lon, "latitude": lat, "longitude": lon,
"hourly": "temperature_2m,precipitation_probability,precipitation,snowfall,rain,snow_depth,weathercode,windspeed_10m,windgusts_10m,winddirection_10m,dewpoint_2m,relative_humidity_2m,cloud_cover,soil_temperature_0cm", "hourly": hourly_params,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,precipitation_hours,precipitation_probability_max,snowfall_sum,showers_sum,rain_sum,weathercode,winddirection_10m_dominant,windspeed_10m_max,windgusts_10m_max", "daily": daily_params,
"timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": forecast_days "timezone": timezone if timezone else TZ_STR, "models": model, "forecast_days": fd_long
} }
try: try:
resp = open_meteo_get(url, params=params, timeout=(5, 25)) resp = open_meteo_get(url, params=params, timeout=(5, 25))
@@ -260,6 +285,163 @@ def get_weather_multi_model(lat, lon, short_term_models, long_term_models, forec
return results return results
def _normalize_time_key(t):
"""Normalizza timestamp per confronto (YYYY-MM-DDTHH:MM)."""
if not t or not isinstance(t, str):
return str(t) if t else ""
return t.strip()[:16]
def _median_or_single(values):
"""Mediana dei valori numerici; ignora None. Con 2 valori restituisce la media dei due."""
nums = [float(v) for v in values if v is not None]
if not nums:
return None
if len(nums) == 1:
return nums[0]
return median(nums)
# Chiavi che esistono solo su ICON Italia (no merge, si tiene il valore da quel modello)
HOURLY_KEYS_ICON_ONLY = ["snow_depth", "showers"]
DAILY_KEYS_ICON_ONLY = ["showers_sum"]
def _merge_hourly_median(hourly_by_model, single_source_keys=None, single_source_model=None):
"""
Unisce hourly da più modelli: mediana per ogni timestamp.
single_source_keys: per queste chiavi si prende il valore solo da single_source_model (es. ICON Italia per snow_depth, showers).
"""
single_source_keys = single_source_keys or []
time_idx = {}
all_times = []
for _model, h in hourly_by_model:
times = h.get("time", []) or []
for t in times:
k = _normalize_time_key(str(t)) if t else ""
if k and k not in time_idx:
time_idx[k] = len(all_times)
all_times.append(t if isinstance(t, str) else k)
if not all_times:
return {"time": [], "temperature_2m": [], "precipitation": [], "snowfall": [], "rain": [], "weathercode": [], "windspeed_10m": [], "winddirection_10m": [], "snow_depth": [], "dewpoint_2m": [], "cloud_cover": [], "soil_temperature_0cm": []}
# Raccogli tutte le chiavi numeriche dal primo modello che le ha
all_keys = []
for _model, h in hourly_by_model:
for key in (h.keys() - {"time"}):
if key not in all_keys:
all_keys.append(key)
out = {"time": all_times}
for key in all_keys:
out[key] = []
for ref_t in all_times:
ref_k = _normalize_time_key(str(ref_t))
if key in single_source_keys and single_source_model:
val = None
for m, h in hourly_by_model:
if m != single_source_model:
continue
times = h.get("time", []) or []
arr = h.get(key, []) or []
for i, t in enumerate(times):
if _normalize_time_key(str(t)) == ref_k and i < len(arr) and arr[i] is not None:
try:
val = float(arr[i]) if key != "weathercode" else (int(arr[i]) if arr[i] is not None else None)
except (TypeError, ValueError):
pass
break
break
out[key].append(val)
else:
vals = []
for _m, h in hourly_by_model:
times = h.get("time", []) or []
arr = h.get(key, []) or []
for i, t in enumerate(times):
if _normalize_time_key(str(t)) == ref_k and i < len(arr) and arr[i] is not None:
try:
vals.append(float(arr[i]))
except (TypeError, ValueError):
pass
break
out[key].append(_median_or_single(vals) if vals else None)
n = len(out["time"])
if n > 1:
order = sorted(range(n), key=lambda i: str(out["time"][i]))
out["time"] = [out["time"][i] for i in order]
for key in all_keys:
if key == "time":
continue
if len(out.get(key, [])) == n:
out[key] = [out[key][i] for i in order]
return out
def _merge_daily_median(daily_by_model, single_source_keys=None, single_source_model=None):
"""Unisce daily da più modelli: mediana per data. single_source_keys: valore solo da single_source_model."""
single_source_keys = single_source_keys or []
time_idx = {}
all_times = []
for _model, d in daily_by_model:
times = d.get("time", []) or []
for t in times:
key = str(t)[:10] if t else ""
if key and key not in time_idx:
time_idx[key] = len(all_times)
all_times.append(key)
if not all_times:
return {"time": [], "temperature_2m_max": [], "temperature_2m_min": [], "precipitation_sum": [], "precipitation_hours": [], "snowfall_sum": [], "showers_sum": [], "rain_sum": [], "weathercode": [], "winddirection_10m_dominant": [], "windspeed_10m_max": [], "windgusts_10m_max": []}
all_keys = []
for _model, d in daily_by_model:
for key in (d.keys() - {"time"}):
if key not in all_keys:
all_keys.append(key)
out = {"time": all_times}
for key in all_keys:
out[key] = []
for date_str in all_times:
if key in single_source_keys and single_source_model:
val = None
for m, d in daily_by_model:
if m != single_source_model:
continue
times = d.get("time", []) or []
arr = d.get(key, []) or []
for i, t in enumerate(times):
if str(t)[:10] == date_str and i < len(arr) and arr[i] is not None:
try:
val = float(arr[i]) if key != "weathercode" else (int(arr[i]) if arr[i] is not None else None)
except (TypeError, ValueError):
pass
break
break
out[key].append(val)
else:
vals = []
for _m, d in daily_by_model:
times = d.get("time", []) or []
arr = d.get(key, []) or []
for i, t in enumerate(times):
if str(t)[:10] == date_str and i < len(arr) and arr[i] is not None:
try:
vals.append(float(arr[i]))
except (TypeError, ValueError):
pass
break
out[key].append(_median_or_single(vals) if vals else None)
# Ordina cronologicamente (evita buchi nel report se l'unione non era ordinata)
n = len(out["time"])
if n > 1:
order = sorted(range(n), key=lambda i: out["time"][i])
out["time"] = [out["time"][i] for i in order]
for key in all_keys:
if key == "time":
continue
if len(out.get(key, [])) == n:
out[key] = [out[key][i] for i in order]
return out
def merge_multi_model_forecast(models_data, forecast_days=10): def merge_multi_model_forecast(models_data, forecast_days=10):
"""Combina dati da modelli a breve e lungo termine in un forecast unificato""" """Combina dati da modelli a breve e lungo termine in un forecast unificato"""
merged = { merged = {
@@ -269,11 +451,11 @@ def merge_multi_model_forecast(models_data, forecast_days=10):
"temperature_2m_min": [], "temperature_2m_min": [],
"precipitation_sum": [], "precipitation_sum": [],
"precipitation_hours": [], "precipitation_hours": [],
"precipitation_probability_max": [],
"snowfall_sum": [], "snowfall_sum": [],
"showers_sum": [], "showers_sum": [],
"rain_sum": [], "rain_sum": [],
"weathercode": [], "weathercode": [],
"winddirection_10m_dominant": [],
"windspeed_10m_max": [], "windspeed_10m_max": [],
"windgusts_10m_max": [] "windgusts_10m_max": []
}, },
@@ -288,189 +470,147 @@ def merge_multi_model_forecast(models_data, forecast_days=10):
"windspeed_10m": [], "windspeed_10m": [],
"winddirection_10m": [], "winddirection_10m": [],
"dewpoint_2m": [], "dewpoint_2m": [],
"precipitation_probability": [],
"cloud_cover": [], "cloud_cover": [],
"soil_temperature_0cm": [] "soil_temperature_0cm": []
}, },
"models_used": [] "models_used": []
} }
# Trova modello a breve termine disponibile (cerca tutti i modelli con type "short_term") cutoff_day = 2 # 0-2d alta risoluzione, 3-10d mediana tre modelli
# Priorità: ICON Italia per snow_depth, altrimenti primo disponibile short_term_list = [(m, models_data[m]) for m in models_data if models_data.get(m) and models_data[m].get("model_type") == "short_term"]
short_term_data = None long_term_list = [(m, models_data[m]) for m in models_data if models_data.get(m) and models_data[m].get("model_type") == "long_term"]
short_term_model = None
icon_italia_data = None
icon_italia_model = None
# Prima cerca ICON Italia (ha snow_depth quando disponibile) if not short_term_list and not long_term_list:
# Cerca anche altri modelli che potrebbero avere snow_depth (icon_d2, etc.)
for model in models_data.keys():
if models_data[model] and models_data[model].get("model_type") == "short_term":
# Priorità a ICON Italia, ma cerca anche altri modelli con snow_depth
if model == "italia_meteo_arpae_icon_2i":
icon_italia_data = models_data[model]
icon_italia_model = model
# ICON-D2 può avere anche snow_depth
elif model == "icon_d2" and icon_italia_data is None:
# Usa ICON-D2 come fallback se ICON Italia non disponibile
hourly_data = models_data[model].get("hourly", {})
snow_depth_values = hourly_data.get("snow_depth", []) if hourly_data else []
# Verifica se ha dati di snow_depth validi
has_valid_snow_depth = False
if snow_depth_values:
for sd in snow_depth_values[:24]:
if sd is not None:
try:
if float(sd) > 0:
has_valid_snow_depth = True
break
except (ValueError, TypeError):
continue
if has_valid_snow_depth:
icon_italia_data = models_data[model]
icon_italia_model = model
# Poi cerca primo modello disponibile (per altri parametri)
for model in models_data.keys():
if models_data[model] and models_data[model].get("model_type") == "short_term":
short_term_data = models_data[model]
short_term_model = model
break
# Trova modello a lungo termine disponibile (cerca tutti i modelli con type "long_term")
long_term_data = None
long_term_model = None
for model in models_data.keys():
if models_data[model] and models_data[model].get("model_type") == "long_term":
long_term_data = models_data[model]
long_term_model = model
break
if not short_term_data and not long_term_data:
return None return None
# Usa dati a breve termine per primi 2-3 giorni, poi passa a lungo termine daily_keys = list(merged["daily"].keys())
cutoff_day = 2 # Usa modelli ad alta risoluzione per primi 2 giorni hourly_keys = list(merged["hourly"].keys())
if short_term_data: def ensure_merged_keys(merged, daily_times, hourly_times):
# Gestisci best_match o modelli specifici for k in daily_keys:
if short_term_model == "best_match": if k == "time":
model_display = "Best Match" continue
else: while len(merged["daily"][k]) < len(merged["daily"]["time"]):
model_display = MODEL_NAMES.get(short_term_model, short_term_model) merged["daily"][k].append(None)
# Verifica se ICON Italia ha dati di snow_depth (controllo diretto, non solo il flag) for k in hourly_keys:
has_icon_snow_depth = False if k == "time":
if icon_italia_data: continue
icon_hourly = icon_italia_data.get("hourly", {}) while len(merged["hourly"][k]) < len(merged["hourly"]["time"]):
icon_snow_depth = icon_hourly.get("snow_depth", []) if icon_hourly else [] merged["hourly"][k].append(None)
# Verifica se ci sono dati non-null di snow_depth
if icon_snow_depth:
for sd in icon_snow_depth[:72]: # Controlla prime 72h
if sd is not None:
try:
if float(sd) > 0: # Anche valori piccoli
has_icon_snow_depth = True
break
except (ValueError, TypeError):
continue
# Se ICON Italia ha dati di snow_depth, aggiungilo ai modelli usati # ---- 0-2 giorni: uno o due modelli short-term ----
if has_icon_snow_depth or (icon_italia_data and icon_italia_data.get("has_snow_depth_data")): if short_term_list:
icon_display = MODEL_NAMES.get("italia_meteo_arpae_icon_2i", "ICON Italia") if len(short_term_list) >= 2:
merged["models_used"].append(f"{model_display} + {icon_display} (snow_depth) (0-{cutoff_day+1}d)") # Mediana ICON Italia + AROME HD; snow_depth e showers solo da ICON Italia
else: short_daily_by_model = [(m, d.get("daily", {}) or {}) for m, d in short_term_list]
merged["models_used"].append(f"{model_display} (0-{cutoff_day+1}d)") short_hourly_by_model = [(m, d.get("hourly", {}) or {}) for m, d in short_term_list]
short_daily = short_term_data.get("daily", {}) merged_short_daily = _merge_daily_median(short_daily_by_model, single_source_keys=DAILY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i")
short_hourly = short_term_data.get("hourly", {}) merged_short_hourly = _merge_hourly_median(short_hourly_by_model, single_source_keys=HOURLY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i")
short_daily_times = (merged_short_daily.get("time") or [])[:cutoff_day + 1] if long_term_list else (merged_short_daily.get("time") or [])
# Prendi dati daily dai primi giorni del modello a breve termine short_hourly_times = merged_short_hourly.get("time") or []
short_daily_times = short_daily.get("time", [])[:cutoff_day+1] cutoff_h = (cutoff_day + 1) * 24 if long_term_list else len(short_hourly_times)
for i, day_time in enumerate(short_daily_times): short_hourly_times = short_hourly_times[:cutoff_h]
merged["daily"]["time"].append(day_time) names_short = " + ".join(MODEL_NAMES.get(m, m) for m, _ in short_term_list[:2])
for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "precipitation_probability_max", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "windspeed_10m_max", "windgusts_10m_max"]: merged["models_used"].append(f"{names_short} (mediana) (0-{len(short_daily_times)}d)")
val = short_daily.get(key, [])[i] if i < len(short_daily.get(key, [])) else None for i, day_time in enumerate(short_daily_times):
merged["daily"][key].append(val) merged["daily"]["time"].append(day_time)
for key in daily_keys:
# Prendi dati hourly dal modello a breve termine if key == "time":
# Priorità: usa snow_depth da ICON Italia se disponibile, altrimenti dal modello principale
short_hourly_times = short_hourly.get("time", [])
icon_italia_hourly = icon_italia_data.get("hourly", {}) if icon_italia_data else {}
icon_italia_hourly_times = icon_italia_hourly.get("time", []) if icon_italia_hourly else []
icon_italia_snow_depth = icon_italia_hourly.get("snow_depth", []) if icon_italia_hourly else []
# Crea mappa timestamp -> snow_depth per ICON Italia (per corrispondenza esatta o approssimata)
icon_snow_depth_map = {}
if icon_italia_hourly_times and icon_italia_snow_depth:
for idx, ts in enumerate(icon_italia_hourly_times):
if idx < len(icon_italia_snow_depth) and icon_italia_snow_depth[idx] is not None:
try:
val_cm = float(icon_italia_snow_depth[idx])
if val_cm >= 0: # Solo valori validi (già in cm)
icon_snow_depth_map[ts] = val_cm
except (ValueError, TypeError):
continue continue
arr = merged_short_daily.get(key, [])
merged["daily"][key].append(arr[i] if i < len(arr) else None)
for i, hour_time in enumerate(short_hourly_times):
merged["hourly"]["time"].append(hour_time)
for key in hourly_keys:
if key == "time":
continue
arr = merged_short_hourly.get(key, [])
merged["hourly"][key].append(arr[i] if i < len(arr) else None)
else:
# Un solo modello short-term (es. best_match o fallback)
short_term_model, short_term_data = short_term_list[0]
short_daily = short_term_data.get("daily", {}) or {}
short_hourly = short_term_data.get("hourly", {}) or {}
short_daily_times_all = short_daily.get("time", []) or []
short_daily_times = short_daily_times_all[:cutoff_day + 1] if long_term_list else short_daily_times_all
model_display = "Best Match" if short_term_model == "best_match" else MODEL_NAMES.get(short_term_model, short_term_model)
merged["models_used"].append(f"{model_display} (0-{len(short_daily_times)}d)")
for i, day_time in enumerate(short_daily_times):
merged["daily"]["time"].append(day_time)
for key in daily_keys:
if key == "time":
continue
arr = short_daily.get(key, [])
merged["daily"][key].append(arr[i] if i < len(arr) else None)
short_hourly_times = short_hourly.get("time", []) or []
cutoff_h = (cutoff_day + 1) * 24 if long_term_list else len(short_hourly_times)
for i, hour_time in enumerate(short_hourly_times[:cutoff_h]):
merged["hourly"]["time"].append(hour_time)
for key in hourly_keys:
if key == "time":
continue
arr = short_hourly.get(key, [])
merged["hourly"][key].append(arr[i] if i < len(arr) else None)
ensure_merged_keys(merged, merged["daily"]["time"], merged["hourly"]["time"])
cutoff_hour = (cutoff_day + 1) * 24 # ---- 3-10 giorni: uno o più modelli long-term ----
for i, hour_time in enumerate(short_hourly_times[:cutoff_hour]): if long_term_list:
merged["hourly"]["time"].append(hour_time) if len(long_term_list) >= 2:
for key in ["temperature_2m", "precipitation", "snowfall", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "precipitation_probability", "cloud_cover", "soil_temperature_0cm"]: long_daily_by_model = [(m, d.get("daily", {}) or {}) for m, d in long_term_list]
val = short_hourly.get(key, [])[i] if i < len(short_hourly.get(key, [])) else None long_hourly_by_model = [(m, d.get("hourly", {}) or {}) for m, d in long_term_list]
merged["hourly"][key].append(val) merged_long_daily = _merge_daily_median(long_daily_by_model, single_source_keys=DAILY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i")
# Per snow_depth: usa ICON Italia se disponibile (corrispondenza per timestamp), altrimenti modello principale merged_long_hourly = _merge_hourly_median(long_hourly_by_model, single_source_keys=HOURLY_KEYS_ICON_ONLY, single_source_model="italia_meteo_arpae_icon_2i")
# NOTA: I valori sono già convertiti in cm durante il recupero dall'API long_daily_times = merged_long_daily.get("time") or []
val_snow_depth = None long_hourly_times = merged_long_hourly.get("time") or []
# Cerca corrispondenza esatta per timestamp names_long = " + ".join(MODEL_NAMES.get(m, m) for m, _ in long_term_list[:3])
if hour_time in icon_snow_depth_map: # Allinea al numero effettivo di giorni/orari short (non indice fisso): evita buco del 3° giorno
# Usa snow_depth da ICON Italia per questo timestamp (già in cm) start_idx = len(merged["daily"]["time"])
val_snow_depth = icon_snow_depth_map[hour_time] start_hour_idx = len(merged["hourly"]["time"])
else: merged["models_used"].append(f"{names_long} (mediana) (giorno {start_idx + 1}-{forecast_days}d)")
# Fallback 1: cerca corrispondenza per ora approssimata (se i timestamp non corrispondono esattamente) for i, day_time in enumerate(long_daily_times):
# Estrai solo la parte ora (YYYY-MM-DDTHH) per corrispondenza approssimata if i < start_idx:
hour_time_base = hour_time[:13] if len(hour_time) >= 13 else hour_time # "2025-01-09T12" continue
for icon_ts, icon_val in icon_snow_depth_map.items(): if i >= forecast_days:
if icon_ts.startswith(hour_time_base): break
val_snow_depth = icon_val merged["daily"]["time"].append(day_time)
break for key in daily_keys:
# Fallback 2: se non trovato, cerca il valore più vicino nello stesso giorno if key == "time":
if val_snow_depth is None and hour_time_base: continue
day_date_str = hour_time[:10] if len(hour_time) >= 10 else None # "2025-01-09" arr = merged_long_daily.get(key, [])
if day_date_str: merged["daily"][key].append(arr[i] if i < len(arr) else None)
# Cerca tutti i valori di ICON Italia per lo stesso giorno needed_hours = forecast_days * 24
same_day_values = [v for ts, v in icon_snow_depth_map.items() if ts.startswith(day_date_str)]
if same_day_values:
# Usa il primo valore disponibile per quel giorno (approssimazione)
val_snow_depth = same_day_values[0]
# Fallback 3: usa snow_depth dal modello principale se ICON Italia non disponibile
if val_snow_depth is None and i < len(short_hourly.get("snow_depth", [])):
val_snow_depth = short_hourly.get("snow_depth", [])[i]
merged["hourly"]["snow_depth"].append(val_snow_depth)
if long_term_data:
merged["models_used"].append(f"{MODEL_NAMES.get(long_term_model, long_term_model)} ({cutoff_day+1}-{forecast_days}d)")
long_daily = long_term_data.get("daily", {})
long_hourly = long_term_data.get("hourly", {})
# Prendi dati daily dal modello a lungo termine per i giorni successivi
long_daily_times = long_daily.get("time", [])
start_idx = cutoff_day + 1
for i in range(start_idx, min(len(long_daily_times), forecast_days)):
merged["daily"]["time"].append(long_daily_times[i])
for key in ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "precipitation_hours", "precipitation_probability_max", "snowfall_sum", "showers_sum", "rain_sum", "weathercode", "windspeed_10m_max", "windgusts_10m_max"]:
val = long_daily.get(key, [])[i] if i < len(long_daily.get(key, [])) else None
merged["daily"][key].append(val)
# Per i dati hourly, completa con dati a lungo termine se necessario
long_hourly_times = long_hourly.get("time", [])
current_hourly_count = len(merged["hourly"]["time"])
needed_hours = forecast_days * 24
if current_hourly_count < needed_hours:
start_hour_idx = current_hourly_count
for i in range(start_hour_idx, min(len(long_hourly_times), needed_hours)): for i in range(start_hour_idx, min(len(long_hourly_times), needed_hours)):
merged["hourly"]["time"].append(long_hourly_times[i]) merged["hourly"]["time"].append(long_hourly_times[i])
for key in ["temperature_2m", "precipitation", "snowfall", "snow_depth", "rain", "weathercode", "windspeed_10m", "winddirection_10m", "dewpoint_2m", "precipitation_probability", "cloud_cover", "soil_temperature_0cm"]: for key in hourly_keys:
val = long_hourly.get(key, [])[i] if i < len(long_hourly.get(key, [])) else None if key == "time":
merged["hourly"][key].append(val) continue
arr = merged_long_hourly.get(key, [])
merged["hourly"][key].append(arr[i] if i < len(arr) else None)
else:
long_term_model, long_term_data = long_term_list[0]
long_daily = long_term_data.get("daily", {}) or {}
long_hourly = long_term_data.get("hourly", {}) or {}
start_idx = len(merged["daily"]["time"])
merged["models_used"].append(f"{MODEL_NAMES.get(long_term_model, long_term_model)} (giorno {start_idx + 1}-{forecast_days}d)")
long_daily_times = long_daily.get("time", []) or []
for i in range(start_idx, min(len(long_daily_times), forecast_days)):
merged["daily"]["time"].append(long_daily_times[i])
for key in daily_keys:
if key == "time":
continue
arr = long_daily.get(key, [])
merged["daily"][key].append(arr[i] if i < len(arr) else None)
long_hourly_times = long_hourly.get("time", []) or []
current_hourly_count = len(merged["hourly"]["time"])
needed_hours = forecast_days * 24
for i in range(current_hourly_count, min(len(long_hourly_times), needed_hours)):
merged["hourly"]["time"].append(long_hourly_times[i])
for key in hourly_keys:
if key == "time":
continue
arr = long_hourly.get(key, [])
merged["hourly"][key].append(arr[i] if i < len(arr) else None)
ensure_merged_keys(merged, merged["daily"]["time"], merged["hourly"]["time"])
return merged return merged
@@ -792,16 +932,13 @@ def analyze_daily_events(times, codes, probs, precip, winds, temps, dewpoints, s
block_precip = precip[start_idx:end_idx] if end_idx <= len(precip) else precip[start_idx:] block_precip = precip[start_idx:end_idx] if end_idx <= len(precip) else precip[start_idx:]
block_precip_clean = [p for p in block_precip if p is not None] block_precip_clean = [p for p in block_precip if p is not None]
tot_mm = sum(block_precip_clean) tot_mm = sum(block_precip_clean)
prob_block = probs[start_idx:end_idx] if end_idx <= len(probs) else probs[start_idx:]
prob_block_clean = [p for p in prob_block if p is not None]
max_prob = max(prob_block_clean) if prob_block_clean else 0
start_time = times[start_idx].split("T")[1][:5] start_time = times[start_idx].split("T")[1][:5]
end_time = times[end_idx].split("T")[1][:5] if end_idx < len(times) else times[-1].split("T")[1][:5] end_time = times[end_idx].split("T")[1][:5] if end_idx < len(times) else times[-1].split("T")[1][:5]
avg_intensity = tot_mm / len(block_precip) if block_precip else 0 avg_intensity = tot_mm / len(block_precip) if block_precip else 0
events.append( events.append(
f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n" f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%" f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm"
) )
start_idx = i start_idx = i
current_rain_type = new_type current_rain_type = new_type
@@ -813,16 +950,13 @@ def analyze_daily_events(times, codes, probs, precip, winds, temps, dewpoints, s
tot_mm = sum(block_precip_clean) tot_mm = sum(block_precip_clean)
if tot_mm > 0: if tot_mm > 0:
prob_block = probs[start_idx:end_idx] if end_idx <= len(probs) else probs[start_idx:]
prob_block_clean = [p for p in prob_block if p is not None]
max_prob = max(prob_block_clean) if prob_block_clean else 0
start_time = times[start_idx].split("T")[1][:5] start_time = times[start_idx].split("T")[1][:5]
end_time = times[min(end_idx-1, len(times)-1)].split("T")[1][:5] end_time = times[min(end_idx-1, len(times)-1)].split("T")[1][:5]
avg_intensity = tot_mm / len(block_precip) if block_precip else 0 avg_intensity = tot_mm / len(block_precip) if block_precip else 0
events.append( events.append(
f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n" f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%" f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm"
) )
# 3. VENTO # 3. VENTO
@@ -1225,12 +1359,7 @@ def format_weather_context_report(models_data, location_name, country_code):
wcode = int(wcode) if wcode is not None else 0 wcode = int(wcode) if wcode is not None else 0
# Calcola probabilità e tipo precipitazione per il giorno (PRIMA di determinare icona) # Calcola probabilità e tipo precipitazione per il giorno (PRIMA di determinare icona)
precip_prob = 0
precip_type = None precip_type = None
if d_probs and any(p is not None for p in d_probs):
prob_values = [p for p in d_probs if p is not None]
precip_prob = max(prob_values) if prob_values else 0
# Determina tipo precipitazione usando dati daily (più affidabili) # Determina tipo precipitazione usando dati daily (più affidabili)
# Usa snowfall_sum, rain_sum, showers_sum per caratterizzazione precisa # Usa snowfall_sum, rain_sum, showers_sum per caratterizzazione precisa
# PRIORITÀ: Se snow_depth > 0 (modello italia_meteo_arpae_icon_2i), usa questi dati per determinare neve # PRIORITÀ: Se snow_depth > 0 (modello italia_meteo_arpae_icon_2i), usa questi dati per determinare neve
@@ -1369,14 +1498,6 @@ def format_weather_context_report(models_data, location_name, country_code):
else: else:
weather_icon = "☁️" # Nuvoloso weather_icon = "☁️" # Nuvoloso
# Recupera probabilità max daily se disponibile
prob_max_list = daily.get('precipitation_probability_max', [])
precip_prob_max = None
if count < len(prob_max_list) and prob_max_list[count] is not None:
precip_prob_max = int(prob_max_list[count])
elif precip_prob > 0:
precip_prob_max = int(precip_prob)
# Calcola spessore manto nevoso (snow_depth) per questo giorno # Calcola spessore manto nevoso (snow_depth) per questo giorno
# Nota: snow_depth è disponibile solo per alcuni modelli (es. ICON Italia, ICON-D2) # Nota: snow_depth è disponibile solo per alcuni modelli (es. ICON Italia, ICON-D2)
# Se il modello non supporta snow_depth, tutti i valori saranno None o mancanti # Se il modello non supporta snow_depth, tutti i valori saranno None o mancanti
@@ -1449,7 +1570,6 @@ def format_weather_context_report(models_data, location_name, country_code):
"t_min": t_min, "t_min": t_min,
"t_max": t_max, "t_max": t_max,
"precip_sum": precip_sum, "precip_sum": precip_sum,
"precip_prob": precip_prob_max if precip_prob_max is not None else precip_prob,
"precip_type": precip_type, "precip_type": precip_type,
"snowfall_sum": snowfall_sum, "snowfall_sum": snowfall_sum,
"rain_sum": rain_sum, "rain_sum": rain_sum,
@@ -1497,13 +1617,6 @@ def format_weather_context_report(models_data, location_name, country_code):
line += f" | {' + '.join(precip_parts)}" line += f" | {' + '.join(precip_parts)}"
# Aggiungi probabilità se disponibile
if day_info['precip_prob'] and day_info['precip_prob'] > 0:
line += f" ({int(day_info['precip_prob'])}%)"
elif day_info['precip_prob'] > 50:
# Probabilità alta ma nessuna precipitazione prevista (può essere un errore del modello)
line += f" | 💧 Possibile ({int(day_info['precip_prob'])}%)"
# Aggiungi vento (sempre se disponibile, formattato come direzione intensità) # Aggiungi vento (sempre se disponibile, formattato come direzione intensità)
if day_info['wind_max'] > 0: if day_info['wind_max'] > 0:
wind_str = f"{day_info['wind_dir']} {day_info['wind_max']:.0f}km/h" wind_str = f"{day_info['wind_dir']} {day_info['wind_max']:.0f}km/h"
File diff suppressed because it is too large Load Diff
+83 -40
View File
@@ -19,15 +19,14 @@ from open_meteo_client import configure_open_meteo_session
# snow_radar.py # snow_radar.py
# #
# Scopo: # Scopo:
# Analizza la nevicata in una griglia di località in un raggio di 40km da San Marino. # Analizza la neve in una griglia di località in un raggio di 40km da San Marino.
# Per ciascuna località mostra: # Combina due parametri Open-Meteo:
# - Nome della località # - snowfall: precipitazione nevosa (cm/h) - neve che cade
# - Somma dello snowfall orario nelle 12 ore precedenti # - snow_depth: spessore manto al suolo (m) - neve accumulata, incluso residuo
# - Somma dello snowfall previsto nelle 12 ore successive # senza precipitazione (es. giorni successivi a nevicata)
# - Somma dello snowfall previsto nelle 24 ore successive
# #
# Modello meteo: # Modello meteo:
# meteofrance_seamless (AROME) per dati dettagliati # italia_meteo_arpae_icon_2i (supporta snowfall e snow_depth)
# #
# Token Telegram: # Token Telegram:
# Nessun token in chiaro. Lettura in ordine: # Nessun token in chiaro. Lettura in ordine:
@@ -88,8 +87,14 @@ LOCATIONS = [
TZ = "Europe/Berlin" TZ = "Europe/Berlin"
TZINFO = ZoneInfo(TZ) TZINFO = ZoneInfo(TZ)
# Modello meteo # Modello meteo: italia_meteo_arpae_icon_2i supporta snowfall e snow_depth
MODEL_AROME = "meteofrance_seamless" # - snowfall: precipitazione nevosa (cm/h)
# - snow_depth: spessore manto al suolo (m), include neve residua anche senza precipitazione
MODEL_SNOW = "italia_meteo_arpae_icon_2i"
# Soglia minima (cm) per considerare "neve presente" - evita falsi positivi da rumore/modello
# Valori < 1 cm sono tracce trascurabili (dew, frost, errori numerici) - non neve reale
SNOW_THRESHOLD_CM = 1.0
# File di log # File di log
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.dirname(os.path.abspath(__file__))
@@ -189,14 +194,16 @@ def calculate_distance_km(lat1: float, lon1: float, lat2: float, lon2: float) ->
def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]: def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]:
""" """
Recupera previsioni meteo per una località. Recupera previsioni meteo per una località.
Inclusi: snowfall (precipitazione nevosa cm/h), snow_depth (manto al suolo m).
""" """
params = { params = {
"latitude": lat, "latitude": lat,
"longitude": lon, "longitude": lon,
"hourly": "snowfall,weathercode", "hourly": "snowfall,snow_depth,weathercode",
"timezone": TZ, "timezone": TZ,
"forecast_days": 2, "past_days": 7,
"models": MODEL_AROME, "forecast_days": 7,
"models": MODEL_SNOW,
} }
try: try:
@@ -217,22 +224,25 @@ def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[
def analyze_snowfall_for_location(data: Dict, now: datetime.datetime) -> Optional[Dict]: def analyze_snowfall_for_location(data: Dict, now: datetime.datetime) -> Optional[Dict]:
""" """
Analizza snowfall per una località. Analizza snowfall e snow_depth per una località.
Nota: Open-Meteo fornisce principalmente dati futuri. Per i dati passati, Combina:
includiamo anche le ore appena passate se disponibili nei dati hourly. - snowfall: precipitazione nevosa (cm/h) - neve che cade
- snow_depth: spessore manto al suolo (m) - neve accumulata, incluso residuo senza precipitazione
Returns: Returns:
Dict con: Dict con valori in cm:
- snow_past_12h: somma snowfall ultime 12 ore (cm) - se disponibile nei dati - snow_past_12h: max(somma snowfall ultime 12h, snow_depth attuale)
- snow_next_12h: somma snowfall prossime 12 ore (cm) - snow_next_12h: somma snowfall prossime 12h
- snow_next_24h: somma snowfall prossime 24 ore (cm) - snow_next_24h: max(somma snowfall prossime 24h, snow_depth max previsto)
- snow_depth_now_cm: manto attuale al suolo (cm)
""" """
hourly = data.get("hourly", {}) or {} hourly = data.get("hourly", {}) or {}
times = hourly.get("time", []) or [] times = hourly.get("time", []) or []
snowfall = hourly.get("snowfall", []) or [] snowfall = hourly.get("snowfall", []) or []
snow_depth_raw = hourly.get("snow_depth", []) or [] # in metri (m)
if not times or not snowfall: if not times:
return None return None
# Converti timestamps # Converti timestamps
@@ -243,29 +253,54 @@ def analyze_snowfall_for_location(data: Dict, now: datetime.datetime) -> Optiona
next_12h_end = now + datetime.timedelta(hours=12) next_12h_end = now + datetime.timedelta(hours=12)
next_24h_end = now + datetime.timedelta(hours=24) next_24h_end = now + datetime.timedelta(hours=24)
snow_past_12h = 0.0 snowfall_past_12h = 0.0
snow_next_12h = 0.0 snowfall_next_12h = 0.0
snow_next_24h = 0.0 snowfall_next_24h = 0.0
snow_depth_now_m = 0.0
snow_depth_max_past_12h_m = 0.0
snow_depth_max_next_24h_m = 0.0
# Rumore numerico: valori < 0.01 cm (snowfall) o < 0.0001 m (snow_depth) → 0
NOISE_FLOOR_SNOWFALL_CM = 0.01
NOISE_FLOOR_SNOW_DEPTH_M = 0.0001
for i, dt in enumerate(dt_list): for i, dt in enumerate(dt_list):
snow_val = float(snowfall[i]) if i < len(snowfall) and snowfall[i] is not None else 0.0 snow_val = float(snowfall[i]) if i < len(snowfall) and snowfall[i] is not None else 0.0
depth_val = float(snow_depth_raw[i]) if i < len(snow_depth_raw) and snow_depth_raw[i] is not None else 0.0
if snow_val < NOISE_FLOOR_SNOWFALL_CM:
snow_val = 0.0
if depth_val < NOISE_FLOOR_SNOW_DEPTH_M:
depth_val = 0.0
# Ultime 12 ore (passato) - solo se i dati includono il passato # Snowfall nelle finestre temporali
if dt < now and dt >= past_12h_start: if dt < now and dt >= past_12h_start:
snow_past_12h += snow_val snowfall_past_12h += snow_val
snow_depth_max_past_12h_m = max(snow_depth_max_past_12h_m, depth_val)
# Prossime 12 ore
if now <= dt < next_12h_end: if now <= dt < next_12h_end:
snow_next_12h += snow_val snowfall_next_12h += snow_val
# Prossime 24 ore
if now <= dt < next_24h_end: if now <= dt < next_24h_end:
snow_next_24h += snow_val snowfall_next_24h += snow_val
snow_depth_max_next_24h_m = max(snow_depth_max_next_24h_m, depth_val)
# snow_depth attuale: usa valore più vicino a "now" (ultima ora passata o prima futura)
if dt <= now:
snow_depth_now_m = depth_val
# snow_depth da m a cm
snow_depth_now_cm = snow_depth_now_m * 100.0
snow_depth_max_past_12h_cm = snow_depth_max_past_12h_m * 100.0
snow_depth_max_next_24h_cm = snow_depth_max_next_24h_m * 100.0
# Combina precipitazione + manto: per passato usa max(somma precipitazione, manto attuale)
# per futuro usa max(somma precipitazione, manto max previsto)
snow_past_12h = max(snowfall_past_12h, snow_depth_now_cm)
snow_next_24h = max(snowfall_next_24h, snow_depth_max_next_24h_cm)
return { return {
"snow_past_12h": snow_past_12h, "snow_past_12h": snow_past_12h,
"snow_next_12h": snow_next_12h, "snow_next_12h": snowfall_next_12h,
"snow_next_24h": snow_next_24h, "snow_next_24h": snow_next_24h,
"snow_depth_now_cm": snow_depth_now_cm,
} }
@@ -314,6 +349,9 @@ def generate_snow_map(results: List[Dict], center_lat: float, center_lon: float,
totals = [r.get(data_field, 0.0) for r in results] totals = [r.get(data_field, 0.0) for r in results]
max_total = max(totals) if totals else 1.0 max_total = max(totals) if totals else 1.0
min_total = min(totals) if totals else 0.0 min_total = min(totals) if totals else 0.0
# Evita vmin==vmax (divisione per zero nel colormap) - tutti 0 → scala 0..1
if max_total <= min_total:
max_total = max(min_total + 0.1, 1.0)
# Estrai coordinate # Estrai coordinate
lats = [r["lat"] for r in results] lats = [r["lat"] for r in results]
@@ -457,9 +495,10 @@ def generate_snow_map(results: List[Dict], center_lat: float, center_lon: float,
ax.legend(handles=legend_elements, loc='lower left', fontsize=10, ax.legend(handles=legend_elements, loc='lower left', fontsize=10,
framealpha=0.95, edgecolor='black', fancybox=True, shadow=True) framealpha=0.95, edgecolor='black', fancybox=True, shadow=True)
# Info timestamp spostata in alto a destra # Info timestamp spostata in alto a destra (Località con neve = solo quelle con neve sopra soglia)
now = now_local() now = now_local()
info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nLocalità con neve: {len(results)}" num_with_snow = sum(1 for r in results if r.get("has_snow", False))
info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nLocalità con neve: {num_with_snow}"
ax.text(0.98, 0.98, info_text, transform=ax.transAxes, ax.text(0.98, 0.98, info_text, transform=ax.transAxes,
fontsize=9, verticalalignment='top', horizontalalignment='right', fontsize=9, verticalalignment='top', horizontalalignment='right',
bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9, bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9,
@@ -660,11 +699,13 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
continue continue
# Aggiungi sempre Casa, anche se non c'è neve # Aggiungi sempre Casa, anche se non c'è neve
# Per le altre località, aggiungi solo se c'è neve (passata o prevista) # Per le altre località, aggiungi solo se c'è neve sopra soglia (precipitazione o manto residuo)
is_casa = loc["name"] == "Casa (Strada Cà Toro)" is_casa = loc["name"] == "Casa (Strada Cà Toro)"
has_snow = (snow_analysis["snow_past_12h"] > 0.0 or has_snow = (
snow_analysis["snow_next_12h"] > 0.0 or snow_analysis["snow_past_12h"] >= SNOW_THRESHOLD_CM or
snow_analysis["snow_next_24h"] > 0.0) snow_analysis["snow_next_12h"] >= SNOW_THRESHOLD_CM or
snow_analysis["snow_next_24h"] >= SNOW_THRESHOLD_CM
)
if is_casa or has_snow: if is_casa or has_snow:
results.append({ results.append({
@@ -672,6 +713,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
"lat": loc["lat"], "lat": loc["lat"],
"lon": loc["lon"], "lon": loc["lon"],
"distance_km": distance_km, "distance_km": distance_km,
"has_snow": has_snow,
**snow_analysis **snow_analysis
}) })
@@ -687,6 +729,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
# Genera e invia DUE mappe separate # Genera e invia DUE mappe separate
now_str = now.strftime('%d/%m/%Y %H:%M') now_str = now.strftime('%d/%m/%Y %H:%M')
num_with_snow = sum(1 for r in results if r.get("has_snow", False))
# 1. Mappa snowfall passato (12h precedenti) # 1. Mappa snowfall passato (12h precedenti)
map_path_past = os.path.join(BASE_DIR, "snow_radar_past.png") map_path_past = os.path.join(BASE_DIR, "snow_radar_past.png")
@@ -698,7 +741,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
f"❄️ *SNOW RADAR - Ultime 12h*\n" f"❄️ *SNOW RADAR - Ultime 12h*\n"
f"📍 Centro: San Marino\n" f"📍 Centro: San Marino\n"
f"🕒 {now_str}\n" f"🕒 {now_str}\n"
f"📊 Località con neve: {len(results)}/{len(LOCATIONS)}" f"📊 Località con neve: {num_with_snow}/{len(LOCATIONS)}"
) )
telegram_send_photo(map_path_past, caption_past, chat_ids=chat_ids) telegram_send_photo(map_path_past, caption_past, chat_ids=chat_ids)
# Pulisci file temporaneo # Pulisci file temporaneo
@@ -718,7 +761,7 @@ def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id
f"❄️ *SNOW RADAR - Prossime 24h*\n" f"❄️ *SNOW RADAR - Prossime 24h*\n"
f"📍 Centro: San Marino\n" f"📍 Centro: San Marino\n"
f"🕒 {now_str}\n" f"🕒 {now_str}\n"
f"📊 Località con neve: {len(results)}/{len(LOCATIONS)}" f"📊 Località con neve: {num_with_snow}/{len(LOCATIONS)}"
) )
telegram_send_photo(map_path_future, caption_future, chat_ids=chat_ids) telegram_send_photo(map_path_future, caption_future, chat_ids=chat_ids)
# Pulisci file temporaneo # Pulisci file temporaneo