817 lines
33 KiB
Python
817 lines
33 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Fotovoltaico: analisi e previsione produzione con modello ICON Italia (ItaliaMeteo ARPAE).
|
||
- Previsione 72h future (Weather Forecast API)
|
||
- Previsione teorica 48h passate (Historical Forecast API) per confronto con produzione reale
|
||
- Produzione reale (opzionale) da API SolarEdge Monitoring
|
||
- Grafici per colpo d'occhio
|
||
|
||
Configurazione impianto:
|
||
- Inverter: 6 kW (SolarEdge)
|
||
- 22 pannelli Longi Solar (415 Wp ciascuno, totale 9,13 kWp)
|
||
- Orientamenti: vedi PANEL_GROUPS
|
||
|
||
SolarEdge: imposta SOLAREDGE_API_KEY e SOLAREDGE_SITE_ID (env o file .env / ~/.solaredge_fotovoltaico)
|
||
per mostrare la produzione reale sul grafico 48h passate.
|
||
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import glob
|
||
import logging
|
||
import os
|
||
import sys
|
||
from datetime import datetime, timedelta
|
||
from io import BytesIO
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import requests
|
||
from zoneinfo import ZoneInfo
|
||
|
||
# Grafici
|
||
import matplotlib
|
||
matplotlib.use("Agg")
|
||
import matplotlib.dates as mdates
|
||
import matplotlib.pyplot as plt
|
||
import numpy as np
|
||
|
||
from open_meteo_client import open_meteo_get
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Configurazione
|
||
# -----------------------------------------------------------------------------
|
||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
TZ = "Europe/Rome"
|
||
TZINFO = ZoneInfo(TZ)
|
||
|
||
# Casa (stessa di meteo.py)
|
||
HOME_LAT = 43.9356
|
||
HOME_LON = 12.4296
|
||
|
||
# Impianto
|
||
INVERTER_KW = 6.0
|
||
PANEL_WP = 415.0 # Longi Solar, 415 Wp per modulo
|
||
NUM_PANELS = 22
|
||
P_PEAK_TOTAL_W = NUM_PANELS * PANEL_WP # 9130 Wp = 9,13 kWp
|
||
# Fisica termica (PVLib-style): i pannelli producono di più quando fa freddo (Wp @ 25°C).
|
||
TEMP_COEFF_POWER = -0.0035 # Longi Hi-MO ~ -0.35%/°C
|
||
NOCT = 45.0 # Nominal Operating Cell Temperature (°C)
|
||
SYSTEM_LOSSES = 0.10 # 10% perdite fisse (cavi, inverter, sporco) → PR eff. ~0.90
|
||
# Correzione empirica: impostabile via env PRODUCTION_CORRECTION_FACTOR o file.
|
||
# Se previsti < reali in modo sistematico, provare 1.2–1.35 (es. in ~/.solaredge_fotovoltaico).
|
||
PRODUCTION_CORRECTION_FACTOR = 1.0
|
||
|
||
# Gruppi pannelli: (tilt_deg, azimuth_compass_reale_deg, numero_pannelli).
|
||
# Azimut = bussola reale osservata (0=N, 90=E, 180=S, 270=W). Non usare i valori di monitoring.solaredge (spesso sbagliati).
|
||
# 1.1.1 a 72°; 1.1.2-10 a 158°; 1.2.1-2 a 253°; 1.2.3-10 + 1.2.12 a 72°; 1.2.11 a 190°. Due gruppi a 72° accorpati in uno.
|
||
PANEL_GROUPS = [
|
||
(34, 72, 10), # 1.1.1 + 1.2.3-10 + 1.2.12 (Est)
|
||
(34, 158, 9), # 1.1.2 - 1.1.10 (Sud-Sud-Est)
|
||
(34, 253, 2), # 1.2.1, 1.2.2 (Ovest-Sud-Ovest)
|
||
(34, 190, 1), # 1.2.11 (Sud-Sud-Ovest)
|
||
]
|
||
|
||
# API Open-Meteo: usiamo solo global_tilted_irradiance (W/m²) con &tilt= e &azimuth=.
|
||
# È l'irradianza reale sul piano del pannello (diretta+diffusa inclinate); non servono
|
||
# direct_radiation/diffuse_radiation separati. shortwave_radiation=GHI orizzontale non usato.
|
||
FORECAST_URL = "https://api.open-meteo.com/v1/forecast"
|
||
HISTORICAL_FORECAST_URL = "https://historical-forecast-api.open-meteo.com/v1/forecast"
|
||
MODEL_ICON_ITALIA = "italia_meteo_arpae_icon_2i"
|
||
MODEL_AROME_HD = "meteofrance_arome_france_hd" # Francia + limitrofi; fuori area può non restituire dati
|
||
HTTP_HEADERS = {"User-Agent": "loogle-bot-fotovoltaico/2.0"}
|
||
|
||
# Telegram
|
||
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
|
||
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
|
||
|
||
# SolarEdge Monitoring API (opzionale: produzione reale sul grafico 48h)
|
||
# Chiave da https://monitoring.solaredge.com (Account → API Access); Site ID dal portale
|
||
SOLAREDGE_BASE = "https://monitoringapi.solaredge.com"
|
||
SOLAREDGE_SITE_ENERGY_PATH = "/site/{site_id}/energy"
|
||
SOLAREDGE_SITE_POWER_PATH = "/site/{site_id}/power"
|
||
DEFAULT_SOLAREDGE_SITE_ID = "3079750" # Usato se SOLAREDGE_SITE_ID non è impostato
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def get_production_correction_factor() -> float:
|
||
"""Fattore di correzione produzione (default 1.0). Env: PRODUCTION_CORRECTION_FACTOR."""
|
||
s = (os.environ.get("PRODUCTION_CORRECTION_FACTOR") or "").strip()
|
||
if s:
|
||
try:
|
||
return max(0.1, min(3.0, float(s)))
|
||
except ValueError:
|
||
pass
|
||
for path in (
|
||
os.path.expanduser("~/.solaredge_fotovoltaico"),
|
||
os.path.join(SCRIPT_DIR, ".env"),
|
||
):
|
||
if not os.path.isfile(path):
|
||
continue
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line.startswith("PRODUCTION_CORRECTION_FACTOR="):
|
||
v = line.split("=", 1)[1].strip().strip("'\"")
|
||
if v:
|
||
return max(0.1, min(3.0, float(v)))
|
||
except Exception:
|
||
pass
|
||
return PRODUCTION_CORRECTION_FACTOR
|
||
|
||
|
||
def compass_to_open_meteo_azimuth(compass_deg: float) -> float:
|
||
"""Converte azimut compass (0=N, 90=E, 180=S, 270=W) in Open-Meteo (0=S, -90=E, 90=W, 180=N). Formula corretta: compass - 180 (non 180 - compass, altrimenti Est/Ovest si invertono)."""
|
||
return compass_deg - 180.0
|
||
|
||
|
||
def calculate_pv_power(gti: float, temp_air: float, n_panels: int) -> float:
|
||
"""Potenza DC (kW) del gruppo con fisica termica: freddo = più efficienza rispetto a STC (25°C)."""
|
||
if gti <= 0:
|
||
return 0.0
|
||
t_cell = temp_air + (gti / 800.0) * (NOCT - 20.0)
|
||
delta_t = t_cell - 25.0
|
||
temp_factor = 1.0 + (TEMP_COEFF_POWER * delta_t)
|
||
p_peak_group_kw = (n_panels * PANEL_WP) / 1000.0
|
||
p_dc = p_peak_group_kw * (gti / 1000.0) * temp_factor * (1.0 - SYSTEM_LOSSES)
|
||
return max(0.0, p_dc)
|
||
|
||
|
||
def fetch_gti_forecast(
|
||
lat: float,
|
||
lon: float,
|
||
tilt: float,
|
||
azimuth_om: float,
|
||
forecast_hours: int = 72,
|
||
past_hours: int = 0,
|
||
model: str = MODEL_ICON_ITALIA,
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""Recupera GTI e temperature_2m dalla Weather Forecast API (modello specificabile)."""
|
||
params = {
|
||
"latitude": lat,
|
||
"longitude": lon,
|
||
"timezone": "auto",
|
||
"models": model,
|
||
"hourly": "global_tilted_irradiance,temperature_2m",
|
||
"tilt": tilt,
|
||
"azimuth": azimuth_om,
|
||
"forecast_hours": forecast_hours,
|
||
"past_hours": past_hours,
|
||
}
|
||
try:
|
||
r = open_meteo_get(
|
||
FORECAST_URL,
|
||
params=params,
|
||
headers=HTTP_HEADERS,
|
||
timeout=(8, 30),
|
||
)
|
||
if r.status_code != 200:
|
||
logger.warning("Forecast API status %s: %s", r.status_code, r.text[:300])
|
||
return None
|
||
return r.json()
|
||
except Exception as e:
|
||
logger.exception("Forecast API error: %s", e)
|
||
return None
|
||
|
||
|
||
def fetch_gti_historical(
|
||
lat: float,
|
||
lon: float,
|
||
tilt: float,
|
||
azimuth_om: float,
|
||
start_date: str,
|
||
end_date: str,
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""Recupera GTI e temperature_2m dalla Historical Forecast API (previsione teorica passata)."""
|
||
params = {
|
||
"latitude": lat,
|
||
"longitude": lon,
|
||
"start_date": start_date,
|
||
"end_date": end_date,
|
||
"timezone": "auto",
|
||
"models": MODEL_ICON_ITALIA,
|
||
"hourly": "global_tilted_irradiance,temperature_2m",
|
||
"tilt": tilt,
|
||
"azimuth": azimuth_om,
|
||
}
|
||
try:
|
||
r = open_meteo_get(
|
||
HISTORICAL_FORECAST_URL,
|
||
params=params,
|
||
headers=HTTP_HEADERS,
|
||
timeout=(8, 45),
|
||
)
|
||
if r.status_code != 200:
|
||
logger.warning("Historical Forecast API status %s: %s", r.status_code, r.text[:300])
|
||
return None
|
||
return r.json()
|
||
except Exception as e:
|
||
logger.exception("Historical Forecast API error: %s", e)
|
||
return None
|
||
|
||
|
||
def production_from_groups(
|
||
hourly_times: List[str],
|
||
gti_per_group: List[List[Optional[float]]],
|
||
temp_per_hour: List[Optional[float]],
|
||
panel_counts: List[int],
|
||
) -> Tuple[List[float], List[float]]:
|
||
"""
|
||
Calcola la produzione per ora: ogni gruppo con GTI + temperatura aria; fisica termica
|
||
(freddo = più efficienza). Produzione totale = SOMMA gruppi, cap inverter, poi fattore correzione.
|
||
"""
|
||
total_panels = sum(panel_counts)
|
||
if total_panels != NUM_PANELS:
|
||
logger.warning(
|
||
"Somma pannelli per gruppo = %s (attesi %s); verifica PANEL_GROUPS",
|
||
total_panels, NUM_PANELS,
|
||
)
|
||
n = len(hourly_times)
|
||
gti_equivalent = []
|
||
power_kw = []
|
||
correction = get_production_correction_factor()
|
||
for i in range(n):
|
||
temp = temp_per_hour[i] if i < len(temp_per_hour) and temp_per_hour[i] is not None else 15.0
|
||
p_dc_total = 0.0
|
||
gti_sum_weighted = 0.0
|
||
for gti_list, n_panels in zip(gti_per_group, panel_counts):
|
||
if gti_list is not None and i < len(gti_list) and gti_list[i] is not None:
|
||
gti_w = gti_list[i]
|
||
p_dc_total += calculate_pv_power(gti_w, temp, n_panels)
|
||
gti_sum_weighted += gti_w * (n_panels / total_panels)
|
||
p_ac_kw = min(p_dc_total, INVERTER_KW) * correction
|
||
power_kw.append(round(p_ac_kw, 3))
|
||
gti_equivalent.append(gti_sum_weighted)
|
||
return gti_equivalent, power_kw
|
||
|
||
|
||
def fetch_forecast_72h(lat: float, lon: float) -> Optional[Tuple[List[str], List[float], List[float]]]:
|
||
"""
|
||
Ottiene previsione 72h come curve intere: oggi, domani, dopodomani (sempre 3 giorni pieni).
|
||
Indipendentemente dall'ora di chiamata, si richiedono dati da mezzanotte di oggi per 72 ore.
|
||
"""
|
||
now = datetime.now(TZINFO)
|
||
midnight_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
past_hours = int((now - midnight_today).total_seconds() / 3600)
|
||
# Richiedi da mezzanotte (past_hours) + 72h future
|
||
total_hours = past_hours + 72
|
||
gti_per_orientation = []
|
||
panel_weights = []
|
||
times_ref = None
|
||
temp_per_hour = None
|
||
for tilt, azimuth_compass, count in PANEL_GROUPS:
|
||
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
|
||
data = fetch_gti_forecast(
|
||
lat, lon, tilt, az_om,
|
||
forecast_hours=72,
|
||
past_hours=past_hours,
|
||
)
|
||
if not data or "hourly" not in data:
|
||
logger.warning("Forecast mancante per tilt=%s az=%s", tilt, azimuth_compass)
|
||
gti_per_orientation.append([None] * total_hours)
|
||
else:
|
||
if times_ref is None:
|
||
times_ref = data["hourly"].get("time", [])
|
||
raw = data["hourly"].get("temperature_2m")
|
||
temp_per_hour = raw if raw else [15.0] * len(times_ref)
|
||
gti = data["hourly"].get("global_tilted_irradiance")
|
||
if not gti:
|
||
gti = data["hourly"].get("global_tilted_irradiance_sum")
|
||
gti_per_orientation.append(gti or [None] * total_hours)
|
||
panel_weights.append(count)
|
||
times = times_ref or []
|
||
if not times:
|
||
return None
|
||
if temp_per_hour is None:
|
||
temp_per_hour = [15.0] * len(times)
|
||
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
|
||
# Filtra: solo le 72 ore da mezzanotte di oggi (oggi, domani, dopodomani interi)
|
||
end_window = midnight_today + timedelta(hours=72)
|
||
dt_list = parse_times_to_datetime(times)
|
||
filtered_times = []
|
||
filtered_gti = []
|
||
filtered_power = []
|
||
for i, dt in enumerate(dt_list):
|
||
if midnight_today <= dt < end_window and len(filtered_times) < 72:
|
||
filtered_times.append(times[i])
|
||
filtered_gti.append(gti_w[i] if i < len(gti_w) else 0)
|
||
filtered_power.append(power_kw[i] if i < len(power_kw) else 0)
|
||
if not filtered_times:
|
||
return None
|
||
return filtered_times, filtered_gti, filtered_power
|
||
|
||
|
||
def _fetch_forecast_72h_for_model(
|
||
lat: float, lon: float, model: str
|
||
) -> Optional[Tuple[List[str], List[float]]]:
|
||
"""
|
||
Come fetch_forecast_72h ma per un singolo modello; ritorna (filtered_times, filtered_power)
|
||
o None se il modello non restituisce dati (es. AROME HD fuori copertura).
|
||
"""
|
||
now = datetime.now(TZINFO)
|
||
midnight_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
past_hours = int((now - midnight_today).total_seconds() / 3600)
|
||
total_hours = past_hours + 72
|
||
gti_per_orientation = []
|
||
panel_weights = []
|
||
times_ref = None
|
||
temp_per_hour = None
|
||
for tilt, azimuth_compass, count in PANEL_GROUPS:
|
||
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
|
||
data = fetch_gti_forecast(
|
||
lat, lon, tilt, az_om,
|
||
forecast_hours=72,
|
||
past_hours=past_hours,
|
||
model=model,
|
||
)
|
||
if not data or "hourly" not in data:
|
||
logger.warning("Forecast (%s) mancante per tilt=%s az=%s", model, tilt, azimuth_compass)
|
||
gti_per_orientation.append([None] * total_hours)
|
||
else:
|
||
if times_ref is None:
|
||
times_ref = data["hourly"].get("time", [])
|
||
raw = data["hourly"].get("temperature_2m")
|
||
temp_per_hour = raw if raw else [15.0] * len(times_ref)
|
||
gti = data["hourly"].get("global_tilted_irradiance")
|
||
if not gti:
|
||
gti = data["hourly"].get("global_tilted_irradiance_sum")
|
||
gti_per_orientation.append(gti or [None] * total_hours)
|
||
panel_weights.append(count)
|
||
times = times_ref or []
|
||
if not times:
|
||
return None
|
||
if temp_per_hour is None:
|
||
temp_per_hour = [15.0] * len(times)
|
||
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
|
||
end_window = midnight_today + timedelta(hours=72)
|
||
dt_list = parse_times_to_datetime(times)
|
||
filtered_times = []
|
||
filtered_power = []
|
||
for i, dt in enumerate(dt_list):
|
||
if midnight_today <= dt < end_window and len(filtered_times) < 72:
|
||
filtered_times.append(times[i])
|
||
filtered_power.append(power_kw[i] if i < len(power_kw) else 0)
|
||
if not filtered_times:
|
||
return None
|
||
return filtered_times, filtered_power
|
||
|
||
|
||
def fetch_forecast_72h_multi(
|
||
lat: float, lon: float
|
||
) -> Optional[Tuple[List[str], List[Tuple[str, List[float]]]]]:
|
||
"""
|
||
Previsione 72h da più modelli (ICON Italia + AROME HD se disponibile).
|
||
Ritorna (times_72, [(label, power_list), ...]). AROME HD copre Francia e limitrofi;
|
||
fuori area può non restituire dati e viene omesso.
|
||
"""
|
||
result_icon = _fetch_forecast_72h_for_model(lat, lon, MODEL_ICON_ITALIA)
|
||
if not result_icon:
|
||
return None
|
||
ref_times, power_icon = result_icon
|
||
n = len(ref_times)
|
||
series: List[Tuple[str, List[float]]] = [("ICON Italia", power_icon)]
|
||
|
||
result_arome = _fetch_forecast_72h_for_model(lat, lon, MODEL_AROME_HD)
|
||
if result_arome:
|
||
times_arome, power_arome = result_arome
|
||
# Allinea alla griglia oraria di ref_times (AROME può avere meno ore, es. 48)
|
||
power_aligned = []
|
||
for t in ref_times:
|
||
if t in times_arome:
|
||
idx = times_arome.index(t)
|
||
power_aligned.append(power_arome[idx] if idx < len(power_arome) else 0.0)
|
||
else:
|
||
power_aligned.append(0.0)
|
||
# Includi AROME solo se ha dati utili (fuori copertura restituisce spesso tutti zero)
|
||
if sum(power_aligned) >= 0.1:
|
||
series.append(("AROME HD", power_aligned))
|
||
logger.info("AROME HD disponibile per confronto previsioni")
|
||
else:
|
||
logger.info("AROME HD senza dati utili per questa località (copertura Francia/limitrofi)")
|
||
else:
|
||
logger.info("AROME HD non disponibile per questa località (copertura Francia/limitrofi)")
|
||
|
||
return ref_times, series
|
||
|
||
|
||
def fetch_historical_48h(lat: float, lon: float) -> Optional[Tuple[List[str], List[float], List[float]]]:
|
||
"""
|
||
Ottiene previsione teorica per ieri l'altro e ieri (2 giorni pieni, senza oggi).
|
||
Usa orientamenti reali (bussola osservata) e temperature_2m per la fisica termica.
|
||
Finestra: solo ieri l'altro + ieri, così il grafico "Ultime 48h" non include la giornata odierna.
|
||
"""
|
||
today = datetime.now(TZINFO).date()
|
||
start_d = datetime.combine(today - timedelta(days=2), datetime.min.time()).replace(tzinfo=TZINFO)
|
||
end_d = datetime.combine(today - timedelta(days=1), datetime.min.time()).replace(tzinfo=TZINFO)
|
||
start_date = start_d.strftime("%Y-%m-%d")
|
||
end_date = end_d.strftime("%Y-%m-%d")
|
||
gti_per_orientation = []
|
||
panel_weights = []
|
||
times_ref = None
|
||
temp_per_hour = None
|
||
for tilt, azimuth_compass, count in PANEL_GROUPS:
|
||
az_om = compass_to_open_meteo_azimuth(azimuth_compass)
|
||
data = fetch_gti_historical(lat, lon, tilt, az_om, start_date, end_date)
|
||
if not data or "hourly" not in data:
|
||
logger.warning("Historical mancante per tilt=%s az=%s", tilt, azimuth_compass)
|
||
gti_per_orientation.append([])
|
||
else:
|
||
if times_ref is None:
|
||
times_ref = data["hourly"].get("time", [])
|
||
raw = data["hourly"].get("temperature_2m")
|
||
temp_per_hour = raw if raw else [15.0] * len(times_ref)
|
||
gti = data["hourly"].get("global_tilted_irradiance")
|
||
if not gti:
|
||
gti = data["hourly"].get("global_tilted_irradiance_sum")
|
||
gti_per_orientation.append(gti or [])
|
||
panel_weights.append(count)
|
||
times = times_ref or []
|
||
if not times or not gti_per_orientation:
|
||
return None
|
||
if temp_per_hour is None:
|
||
temp_per_hour = [15.0] * len(times)
|
||
n = len(times)
|
||
for i, gti_list in enumerate(gti_per_orientation):
|
||
if len(gti_list) < n:
|
||
gti_per_orientation[i] = gti_list + [None] * (n - len(gti_list))
|
||
elif len(gti_list) > n:
|
||
gti_per_orientation[i] = gti_list[:n]
|
||
gti_w, power_kw = production_from_groups(times, gti_per_orientation, temp_per_hour, panel_weights)
|
||
return times, gti_w, power_kw
|
||
|
||
|
||
def load_solaredge_config() -> Tuple[str, str]:
|
||
"""Ritorna (api_key, site_id). site_id usa DEFAULT_SOLAREDGE_SITE_ID se non impostato."""
|
||
api_key = (os.environ.get("SOLAREDGE_API_KEY") or "").strip()
|
||
site_id = (os.environ.get("SOLAREDGE_SITE_ID") or "").strip() or DEFAULT_SOLAREDGE_SITE_ID
|
||
if api_key and site_id:
|
||
return api_key, site_id
|
||
for path in (
|
||
os.path.expanduser("~/.solaredge_fotovoltaico"),
|
||
os.path.join(SCRIPT_DIR, ".env"),
|
||
):
|
||
if not os.path.isfile(path):
|
||
continue
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
if "=" in line:
|
||
k, v = line.split("=", 1)
|
||
k, v = k.strip(), v.strip().strip('"\'')
|
||
if k == "SOLAREDGE_API_KEY":
|
||
api_key = api_key or v
|
||
elif k == "SOLAREDGE_SITE_ID":
|
||
site_id = site_id or v
|
||
except Exception as e:
|
||
logger.debug("Lettura %s: %s", path, e)
|
||
if api_key and site_id:
|
||
break
|
||
return api_key or "", site_id or DEFAULT_SOLAREDGE_SITE_ID
|
||
|
||
|
||
def fetch_solaredge_energy_48h(api_key: str, site_id: str) -> Optional[Tuple[List[datetime], List[float]]]:
|
||
"""Recupera produzione reale per ieri l'altro e ieri (stessa finestra del grafico 48h, senza oggi)."""
|
||
today = datetime.now(TZINFO).date()
|
||
start_date = (today - timedelta(days=2)).strftime("%Y-%m-%d")
|
||
end_date = (today - timedelta(days=1)).strftime("%Y-%m-%d")
|
||
url = SOLAREDGE_BASE + SOLAREDGE_SITE_ENERGY_PATH.format(site_id=site_id)
|
||
params = {
|
||
"api_key": api_key,
|
||
"startDate": start_date,
|
||
"endDate": end_date,
|
||
"timeUnit": "HOUR",
|
||
}
|
||
try:
|
||
r = requests.get(url, params=params, headers=HTTP_HEADERS, timeout=(8, 25))
|
||
if r.status_code != 200:
|
||
logger.warning("SolarEdge API status %s: %s", r.status_code, r.text[:300])
|
||
return None
|
||
data = r.json()
|
||
energy_block = data.get("energy", {}) or data.get("siteEnergy", {})
|
||
values = energy_block.get("values") or data.get("values")
|
||
if not values:
|
||
logger.warning("SolarEdge: nessun 'energy.values' in risposta")
|
||
return None
|
||
times_out: List[datetime] = []
|
||
power_kw_out: List[float] = []
|
||
for item in values:
|
||
val = item.get("value")
|
||
if val is None:
|
||
continue
|
||
date_str = item.get("date") or item.get("time") or ""
|
||
try:
|
||
if "T" in date_str or " " in date_str:
|
||
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00").strip())
|
||
else:
|
||
dt = datetime.strptime(date_str[:10], "%Y-%m-%d").replace(tzinfo=TZINFO)
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=TZINFO)
|
||
except Exception:
|
||
continue
|
||
power_kw = float(val) / 1000.0
|
||
times_out.append(dt)
|
||
power_kw_out.append(round(power_kw, 3))
|
||
if not times_out:
|
||
return None
|
||
return times_out, power_kw_out
|
||
except Exception as e:
|
||
logger.exception("SolarEdge API error: %s", e)
|
||
return None
|
||
|
||
|
||
def kwh_per_day_from_series(
|
||
times: List[str],
|
||
power_kw: List[float],
|
||
) -> List[Tuple[str, float]]:
|
||
"""Raggruppa power_kw (kWh/ora) per data e ritorna [(data dd/mm, kwh), ...]."""
|
||
from collections import defaultdict
|
||
dt_list = parse_times_to_datetime(times)
|
||
by_date: Dict[str, float] = defaultdict(float)
|
||
for i, dt in enumerate(dt_list):
|
||
if i < len(power_kw):
|
||
key = dt.strftime("%Y-%m-%d")
|
||
by_date[key] += power_kw[i]
|
||
out: List[Tuple[str, float]] = []
|
||
for d_str in sorted(by_date.keys()):
|
||
d = datetime.strptime(d_str, "%Y-%m-%d").date()
|
||
label = d.strftime("%d/%m")
|
||
out.append((label, round(by_date[d_str], 1)))
|
||
return out
|
||
|
||
|
||
def parse_times_to_datetime(times: List[str]) -> List[datetime]:
|
||
out = []
|
||
for t in times:
|
||
try:
|
||
if "T" in t:
|
||
dt = datetime.fromisoformat(t.replace("Z", "+00:00"))
|
||
else:
|
||
dt = datetime.strptime(t, "%Y-%m-%d").replace(tzinfo=TZINFO)
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=TZINFO)
|
||
out.append(dt)
|
||
except Exception:
|
||
out.append(datetime.now(TZINFO))
|
||
return out
|
||
|
||
|
||
def plot_past_48h(
|
||
times: List[str],
|
||
power_kw: List[float],
|
||
title_suffix: str = "",
|
||
real_times: Optional[List[datetime]] = None,
|
||
real_power_kw: Optional[List[float]] = None,
|
||
) -> bytes:
|
||
"""Genera il grafico in memoria e ritorna i byte PNG (non salva su disco)."""
|
||
fig, ax = plt.subplots(figsize=(10, 4.5))
|
||
dt_list = parse_times_to_datetime(times)
|
||
ax.fill_between(dt_list, 0, power_kw, alpha=0.5, color="steelblue")
|
||
ax.plot(dt_list, power_kw, color="navy", linewidth=1.2, label="Previsione (ICON Italia)")
|
||
if real_times and real_power_kw and len(real_times) == len(real_power_kw):
|
||
ax.plot(real_times, real_power_kw, color="darkorange", linewidth=1.4, label="Produzione reale (SolarEdge)")
|
||
ax.axhline(y=INVERTER_KW, color="red", linestyle="--", alpha=0.7, label=f"Limite inverter {INVERTER_KW} kW")
|
||
ax.set_ylabel("Potenza AC (kW)")
|
||
ax.set_xlabel("Data / Ora")
|
||
ax.set_title(f"Produzione – Ieri l'altro e Ieri{title_suffix}")
|
||
ax.legend(loc="upper right")
|
||
all_vals = list(power_kw) + (list(real_power_kw) if real_power_kw else [])
|
||
ax.set_ylim(0, max(INVERTER_KW * 1.05, max(all_vals) * 1.1) if all_vals else 7)
|
||
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d/%m %H:%M", tz=TZINFO))
|
||
ax.xaxis.set_major_locator(mdates.HourLocator(interval=6))
|
||
plt.xticks(rotation=25)
|
||
plt.tight_layout()
|
||
buf = BytesIO()
|
||
plt.savefig(buf, format="png", dpi=120)
|
||
plt.close()
|
||
return buf.getvalue()
|
||
|
||
|
||
def plot_future_72h(
|
||
times: List[str],
|
||
power_series: List[Tuple[str, List[float]]],
|
||
title_suffix: str = "",
|
||
) -> bytes:
|
||
"""
|
||
Genera il grafico in memoria. power_series: [(label, power_kw_list), ...];
|
||
la prima serie ha anche fill_between, le altre solo linea (confronto modelli).
|
||
"""
|
||
if not power_series:
|
||
plt.close("all")
|
||
return b""
|
||
fig, ax = plt.subplots(figsize=(10, 4.5))
|
||
dt_list = parse_times_to_datetime(times)
|
||
colors = ["green", "blue", "purple"]
|
||
all_vals = []
|
||
for i, (label, power_kw) in enumerate(power_series):
|
||
if len(power_kw) != len(dt_list):
|
||
power_kw = (power_kw + [0.0] * len(dt_list))[: len(dt_list)]
|
||
all_vals.extend(power_kw)
|
||
color = colors[i % len(colors)]
|
||
if i == 0:
|
||
ax.fill_between(dt_list, 0, power_kw, alpha=0.4, color=color)
|
||
ax.plot(dt_list, power_kw, color=color, linewidth=1.2, label=label)
|
||
ax.axhline(y=INVERTER_KW, color="red", linestyle="--", alpha=0.7, label=f"Limite inverter {INVERTER_KW} kW")
|
||
ax.set_ylabel("Potenza AC (kW)")
|
||
ax.set_xlabel("Data / Ora")
|
||
ax.set_title(f"Previsione produzione – Oggi, Domani, Dopodomani{title_suffix}")
|
||
ax.legend(loc="upper right")
|
||
ax.set_ylim(0, max(INVERTER_KW * 1.05, max(all_vals) * 1.1) if all_vals else 7)
|
||
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d/%m %H:%M", tz=TZINFO))
|
||
ax.xaxis.set_major_locator(mdates.HourLocator(interval=6))
|
||
plt.xticks(rotation=25)
|
||
plt.tight_layout()
|
||
buf = BytesIO()
|
||
plt.savefig(buf, format="png", dpi=120)
|
||
plt.close()
|
||
return buf.getvalue()
|
||
|
||
|
||
def load_bot_token() -> str:
|
||
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip() or os.environ.get("BOT_TOKEN", "").strip()
|
||
if tok:
|
||
return tok
|
||
for path in (TOKEN_FILE_HOME, TOKEN_FILE_ETC):
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
t = f.read().strip()
|
||
if t:
|
||
return t
|
||
except FileNotFoundError:
|
||
continue
|
||
return ""
|
||
|
||
|
||
def telegram_send_photo(photo_bytes: bytes, caption: str, chat_id: str) -> bool:
|
||
"""Invia una foto a Telegram da bytes (nessun file su disco)."""
|
||
token = load_bot_token()
|
||
if not token:
|
||
logger.warning("Token Telegram mancante")
|
||
return False
|
||
url = f"https://api.telegram.org/bot{token}/sendPhoto"
|
||
try:
|
||
r = requests.post(
|
||
url,
|
||
data={"chat_id": chat_id, "caption": caption, "parse_mode": "Markdown"},
|
||
files={"photo": ("grafico.png", photo_bytes, "image/png")},
|
||
timeout=20,
|
||
)
|
||
if r.status_code != 200:
|
||
logger.error("Telegram sendPhoto %s: %s", r.status_code, r.text[:400])
|
||
return False
|
||
return True
|
||
except Exception as e:
|
||
logger.exception("Telegram sendPhoto: %s", e)
|
||
return False
|
||
|
||
|
||
def telegram_send_message(text: str, chat_id: str) -> bool:
|
||
token = load_bot_token()
|
||
if not token:
|
||
return False
|
||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||
try:
|
||
r = requests.post(
|
||
url,
|
||
json={"chat_id": chat_id, "text": text, "parse_mode": "Markdown"},
|
||
timeout=15,
|
||
)
|
||
return r.status_code == 200
|
||
except Exception as e:
|
||
logger.exception("Telegram sendMessage: %s", e)
|
||
return False
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(description="Previsione e analisi produzione fotovoltaico (ICON Italia)")
|
||
parser.add_argument("--lat", type=float, default=HOME_LAT, help="Latitudine")
|
||
parser.add_argument("--lon", type=float, default=HOME_LON, help="Longitudine")
|
||
parser.add_argument("--telegram", action="store_true", help="Invia grafici e messaggio a Telegram")
|
||
parser.add_argument("--chat_id", type=str, default="", help="Chat ID per invio Telegram")
|
||
parser.add_argument("--no-past", action="store_true", help="Salta grafico 48h passate")
|
||
parser.add_argument("--no-future", action="store_true", help="Salta grafico 72h future")
|
||
args = parser.parse_args()
|
||
lat, lon = args.lat, args.lon
|
||
send_telegram = args.telegram and args.chat_id.strip()
|
||
|
||
# Rimuovi eventuali grafici fotovoltaico salvati in precedenza (non ne salviamo più)
|
||
for old in glob.glob(os.path.join(SCRIPT_DIR, "fotovoltaico_*.png")):
|
||
try:
|
||
os.remove(old)
|
||
logger.info("Rimosso %s", old)
|
||
except OSError as e:
|
||
logger.warning("Impossibile rimuovere %s: %s", old, e)
|
||
hist: Optional[Tuple[List[str], List[float], List[float]]] = None
|
||
fore: Optional[Tuple[List[str], List[float], List[float]]] = None
|
||
|
||
solaredge_api_key, solaredge_site_id = load_solaredge_config()
|
||
real_48h: Optional[Tuple[List[datetime], List[float]]] = None
|
||
if solaredge_api_key and solaredge_site_id and not args.no_past:
|
||
logger.info("Recupero produzione reale SolarEdge (48h)...")
|
||
real_48h = fetch_solaredge_energy_48h(solaredge_api_key, solaredge_site_id)
|
||
if not real_48h:
|
||
logger.warning("SolarEdge: dati 48h non disponibili (controlla API key e Site ID)")
|
||
|
||
past_ok = False
|
||
if not args.no_past:
|
||
logger.info(
|
||
"Pannelli: %s gruppi (tilt, azimut°, n) = %s",
|
||
len(PANEL_GROUPS),
|
||
[(t, a, n) for t, a, n in PANEL_GROUPS],
|
||
)
|
||
logger.info("Recupero dati Historical Forecast (48h passate)...")
|
||
hist = fetch_historical_48h(lat, lon)
|
||
if hist:
|
||
times_past, _, power_past = hist
|
||
past_days_debug = kwh_per_day_from_series(hist[0], hist[2])
|
||
logger.info("Historical 48h kWh per giorno (previsione): %s", past_days_debug)
|
||
real_t = (real_48h[0], real_48h[1]) if real_48h else (None, None)
|
||
img_past = plot_past_48h(times_past, power_past, real_times=real_t[0], real_power_kw=real_t[1])
|
||
past_ok = True
|
||
if send_telegram:
|
||
caption = "📊 *Produzione – Ieri l'altro e Ieri* (ICON Italia" + (" + SolarEdge reale" if real_48h else "") + ")"
|
||
telegram_send_photo(img_past, caption, args.chat_id)
|
||
else:
|
||
logger.warning("Nessun dato Historical Forecast per le 48h passate")
|
||
|
||
future_ok = False
|
||
fore_multi: Optional[Tuple[List[str], List[Tuple[str, List[float]]]]] = None
|
||
if not args.no_future:
|
||
logger.info("Recupero dati Forecast (72h future, ICON + AROME HD)...")
|
||
fore_multi = fetch_forecast_72h_multi(lat, lon)
|
||
if fore_multi:
|
||
times_fut, power_series = fore_multi
|
||
img_fut = plot_future_72h(times_fut, power_series)
|
||
future_ok = True
|
||
if send_telegram:
|
||
models_label = " / ".join(s[0] for s in power_series)
|
||
telegram_send_photo(
|
||
img_fut,
|
||
f"☀️ *Previsione – Oggi, Domani, Dopodomani* ({models_label})",
|
||
args.chat_id,
|
||
)
|
||
else:
|
||
logger.warning("Nessun dato Forecast per le 72h future")
|
||
|
||
if send_telegram:
|
||
lines = ["🖥 *Fotovoltaico*", ""]
|
||
if past_ok and hist:
|
||
past_days = kwh_per_day_from_series(hist[0], hist[2])
|
||
real_days = None
|
||
if real_48h:
|
||
real_days = kwh_per_day_from_series(
|
||
[t.isoformat() for t in real_48h[0]],
|
||
real_48h[1],
|
||
)
|
||
real_by_date = {d: k for d, k in (real_days or [])}
|
||
lines.append("📊 *Ultimi 2 giorni*")
|
||
for label, kwh in past_days:
|
||
part = f" {label} · prev. {kwh} kWh"
|
||
if label in real_by_date:
|
||
part += f" · _reale_ {real_by_date[label]} kWh"
|
||
lines.append(part)
|
||
lines.append("")
|
||
if future_ok and fore_multi:
|
||
times_fut, power_series = fore_multi
|
||
fut_days_per_model = [
|
||
(name, kwh_per_day_from_series(times_fut, power_list))
|
||
for name, power_list in power_series
|
||
]
|
||
lines.append("☀️ *Prossimi 3 giorni*")
|
||
if len(fut_days_per_model) == 1:
|
||
for label, kwh in fut_days_per_model[0][1]:
|
||
lines.append(f" {label} · {kwh} kWh")
|
||
else:
|
||
dates = sorted({d for _, days_list in fut_days_per_model for d, _ in days_list})
|
||
for d in dates:
|
||
parts = [f" {d}"]
|
||
for name, days_list in fut_days_per_model:
|
||
val = next((k for lbl, k in days_list if lbl == d), None)
|
||
if val is not None:
|
||
short = "ICON" if "ICON" in name else "AROME"
|
||
parts.append(f"{short} {val}")
|
||
lines.append(" · ".join(parts) + " kWh")
|
||
lines.append("")
|
||
models_footer = " · ".join(s[0] for s in power_series)
|
||
lines.append(f"_Modello: {models_footer}_")
|
||
telegram_send_message("\n".join(lines), args.chat_id)
|
||
|
||
if not past_ok and not future_ok:
|
||
logger.error("Nessun dato disponibile")
|
||
return 1
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|