Files
loogle-scripts/services/telegram-bot/smart_irrigation_advisor.py
T

2270 lines
95 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Smart Irrigation Advisor - Consulente Agronomico "Smart Season"
Fornisce consigli pragmatici per la gestione stagionale dell'irrigazione del giardino
basati su dati meteo e stato del suolo.
Pianificazione irrigazione: logica allineata a "A Guide to Soil Moisture" (ConnectedCrops)
e tabelle OMAFRA: FC, PWP, TAW; mantenere TAW > 50% (trigger a PAW); evitare saturazione
e percolazione profonda; suoli argillosi: irrigazioni lente e lunghe.
"""
import argparse
import datetime
import json
import logging
import os
import sys
from logging.handlers import RotatingFileHandler
from statistics import median as _median
from typing import Dict, List, Optional, Tuple
from zoneinfo import ZoneInfo
import requests
from dateutil import parser
from open_meteo_client import open_meteo_get
# =============================================================================
# CONFIGURATION
# =============================================================================
DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
# Location
DEFAULT_LAT = 43.9356
DEFAULT_LON = 12.4296
DEFAULT_LOCATION_NAME = "🏠 Casa"
# Timezone
TZ = "Europe/Berlin"
TZINFO = ZoneInfo(TZ)
# Open-Meteo: due fonti
# - Suolo (tutti i layer): ICON Seamless (DWD) - copertura Europa centrale
# - Meteo (ET₀, precipitazioni, T°): analisi a tre modelli con mediana (vedi OPEN_METEO_MODELS.md)
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
MODEL_SOIL = "icon_seamless" # Dati suolo (0-1, 1-3, 3-9, 9-27, 27-81 cm) e T° suolo; forecast_days=8
MODEL_WEATHER = "italia_meteo_arpae_icon_2i" # Retrocompatibilità / primo modello
MODEL_ICON = MODEL_WEATHER # Retrocompatibilità
# Tre modelli per mediana (Europa/Italia: ICON Italia + ECMWF IFS + ARPEGE/Météo-France; ARPEGE preferito a GFS)
WEATHER_MODELS_THREE = [
"italia_meteo_arpae_icon_2i", # ~3 d utili, 2 km Italia/SM
"ecmwf_ifs", # 15 d, ~9 km
"meteofrance_seamless", # ARPEGE+AROME, 4 d, 0.1° Europa
]
WEATHER_MODELS_FORECAST_DAYS = {
"italia_meteo_arpae_icon_2i": 10,
"ecmwf_ifs": 10,
"meteofrance_seamless": 4,
}
HTTP_HEADERS = {"User-Agent": "Smart-Irrigation-Advisor/2.0"}
# Files
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
LOG_FILE = os.path.join(BASE_DIR, "irrigation_advisor.log")
STATE_FILE = os.path.join(BASE_DIR, "irrigation_state.json")
# Telegram (opzionale, per integrazione bot)
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# Soglie Agronomiche
SOIL_TEMP_WAKEUP_THRESHOLD = 10.0 # °C - Soglia di risveglio vegetativo
SOIL_TEMP_WAKEUP_DAYS_MIN = 3 # Giorni consecutivi minimi per risveglio
SOIL_TEMP_WAKEUP_DAYS_MAX = 5 # Giorni consecutivi massimi per risveglio
SOIL_TEMP_SHUTDOWN_THRESHOLD = 10.0 # °C - Soglia di chiusura autunnale
SOIL_TEMP_WAKEUP_INDICATOR = 8.0 # °C - Soglia indicatore di avvicinamento al risveglio (sblocca report)
# Tipo di suolo: "clay_loam" (default), "clay", "loam", "sandy", "medium". Imposta IRRIGATION_SOIL_TYPE in env.
# Parametri in m³/m³ (VWC). Franco Argilloso (Clay Loam): CC 32-36%, PA 18-20%, AWC 14-16%, Infiltrazione 5-10 mm/h.
# FC = Field Capacity, PWP = Punto Appassimento, AWC = Acqua Disponibile. Trigger = FC - 0.5*AWC (MAD 50%).
_SOIL_TYPE = os.environ.get("IRRIGATION_SOIL_TYPE", "clay_loam").strip().lower().replace(" ", "_")
if _SOIL_TYPE == "clay_loam":
# Franco Argilloso (Clay Loam): CC 32-36%, PA 18-20%, AWC 14-16% (tabella tessiture)
SOIL_MOISTURE_FIELD_CAPACITY = 0.34 # CC 34% (centro intervallo 32-36)
SOIL_MOISTURE_WILTING_POINT = 0.19 # PA 19% (18-20)
SOIL_MOISTURE_DEEP_STRESS = 0.265 # Trigger irrigazione: FC - 0.5*AWC (AWC 15% → 34 - 7.5 ≈ 26.5%)
SOIL_MOISTURE_AUTUMN_HIGH = 0.34 # A FC = suolo bagnato, ok chiusura autunnale
SOIL_INFILTRATION_MMH = 7.5 # Infiltrazione 5-10 mm/h (riferimento per irrigazione lenta)
elif _SOIL_TYPE == "clay":
# ARGILLA pura: FC 37.5%, PWP 24% (OMAFRA). Sotto 10°C = blocco totale.
SOIL_MOISTURE_FIELD_CAPACITY = 0.38
SOIL_MOISTURE_WILTING_POINT = 0.22
SOIL_MOISTURE_DEEP_STRESS = 0.28
SOIL_MOISTURE_AUTUMN_HIGH = 0.38
SOIL_INFILTRATION_MMH = 5.0
elif _SOIL_TYPE == "loam":
# Loam (OMAFRA): PWP 12.5%, FC 25%, TAW 12.5%
SOIL_MOISTURE_FIELD_CAPACITY = 0.25
SOIL_MOISTURE_WILTING_POINT = 0.125
SOIL_MOISTURE_DEEP_STRESS = 0.19 # FC - 0.5*TAW
SOIL_MOISTURE_AUTUMN_HIGH = 0.25
SOIL_INFILTRATION_MMH = 10.0
elif _SOIL_TYPE == "sandy":
SOIL_MOISTURE_FIELD_CAPACITY = 0.35
SOIL_MOISTURE_WILTING_POINT = 0.10
SOIL_MOISTURE_DEEP_STRESS = 0.18
SOIL_MOISTURE_AUTUMN_HIGH = 0.45
SOIL_INFILTRATION_MMH = 15.0
else:
# Medium / silt loam (default generico)
SOIL_MOISTURE_FIELD_CAPACITY = 0.6
SOIL_MOISTURE_WILTING_POINT = 0.3
SOIL_MOISTURE_DEEP_STRESS = 0.35
SOIL_MOISTURE_AUTUMN_HIGH = 0.55
SOIL_INFILTRATION_MMH = 10.0
PRECIP_THRESHOLD_SIGNIFICANT = 5.0 # mm - Pioggia significativa
PRECIP_VETO_MM_24H = 4.0 # Veto avvio: se pioggia ultime 24h >= questo valore, non irrigare (dato da sensore/API se disponibile)
AUTUMN_HIGH_MOISTURE_DAYS = 5 # Giorni consecutivi con umidità alta per chiusura (path saturazione)
# Suoli a base argilla: irrigazioni lente e lunghe, evitare brevi rinfrescate (runoff, crusting)
SOIL_IS_CLAY = _SOIL_TYPE in ("clay", "clay_loam")
# Veto e sicurezza
AIR_TEMP_FREEZE_VETO = 4.0 # °C - Temperatura aria min: sotto questa soglia non irrigare (rischio ghiaccio)
SHORTWAVE_DAYTIME_LOCK_WM2 = 400.0 # W/m² - Evitare irrigazione pesante in pieno sole (alta evaporazione)
# Modello a serbatoio (bucket) per quantità / minuti suggeriti (ET0 da Open-Meteo; in futuro SolarEdge)
WATER_BALANCE_MAX_MM = 20.0 # mm - Capacità serbatoio (acqua disponibile prima di stress)
WATER_BALANCE_CRITICAL_MM = 10.0 # mm - Sotto questa soglia si suggerisce irrigazione
KC_LAWN = 0.8 # Coefficiente colturale prato/giardino
APPLIED_MM_PER_HOUR = 8.0 # mm/h - Portata irrigatore tipica (adattare al proprio impianto)
# =============================================================================
# CLASSIFICAZIONE VALORI PARAMETRI
# =============================================================================
# Soglie per classificare i parametri come bassi, medio/bassi, medi, alti, medio/alti
# Evapotraspirazione (ET₀) - mm/d
ET0_LOW = 2.0 # < 2.0 mm/d = basso
ET0_MEDIUM_LOW = 3.5 # 2.0-3.5 mm/d = medio/basso
ET0_MEDIUM_HIGH = 5.0 # 3.5-5.0 mm/d = medio/alto
# > 5.0 mm/d = alto
# Temperatura suolo - °C
SOIL_TEMP_LOW = 5.0 # < 5°C = basso
SOIL_TEMP_MEDIUM_LOW = 10.0 # 5-10°C = medio/basso
SOIL_TEMP_MEDIUM_HIGH = 15.0 # 10-15°C = medio/alto
# > 15°C = alto
# Umidità suolo: classificazione espositiva usa le soglie agronomiche per tipo di suolo
# (SOIL_MOISTURE_WILTING_POINT, DEEP_STRESS, FIELD_CAPACITY) → vedi classify_soil_moisture()
# VPD - kPa
VPD_LOW = 0.5 # < 0.5 kPa = basso (umido)
VPD_MEDIUM_LOW = 0.8 # 0.5-0.8 kPa = medio/basso
VPD_MEDIUM_HIGH = 1.2 # 0.8-1.2 kPa = medio/alto
# > 1.2 kPa = alto (secco, stress idrico)
# Sunshine duration - ore/giorno
SUNSHINE_LOW = 4.0 # < 4h = basso
SUNSHINE_MEDIUM_LOW = 6.0 # 4-6h = medio/basso
SUNSHINE_MEDIUM_HIGH = 8.0 # 6-8h = medio/alto
# > 8h = alto
# Precipitazioni - mm/giorno
PRECIP_DAILY_LOW = 2.0 # < 2mm/giorno = basso
PRECIP_DAILY_MEDIUM_LOW = 5.0 # 2-5mm/giorno = medio/basso
PRECIP_DAILY_MEDIUM_HIGH = 15.0 # 5-15mm/giorno = medio/alto
# > 15mm/giorno = alto
# =============================================================================
# LOGGING
# =============================================================================
def setup_logger() -> logging.Logger:
logger = logging.getLogger("irrigation_advisor")
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
logger.handlers.clear()
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
parent_dir = os.path.dirname(LOG_FILE)
if parent_dir and not os.path.exists(parent_dir):
os.makedirs(parent_dir, exist_ok=True)
try:
fh = RotatingFileHandler(LOG_FILE, maxBytes=500_000, backupCount=3, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(fmt)
logger.addHandler(fh)
except PermissionError:
fallback_log = "/tmp/irrigation_advisor.log"
try:
fh = RotatingFileHandler(fallback_log, maxBytes=500_000, backupCount=3, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(fmt)
logger.addHandler(fh)
logger.warning("Permesso negato su %s, uso fallback %s", LOG_FILE, fallback_log)
except Exception:
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(fmt)
logger.addHandler(sh)
logger.warning("Permesso negato su %s, fallback su stderr", LOG_FILE)
except Exception:
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(fmt)
logger.addHandler(sh)
logger.warning("Errore logger file %s, fallback su stderr", LOG_FILE)
if DEBUG:
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(fmt)
logger.addHandler(sh)
return logger
LOGGER = setup_logger()
# =============================================================================
# UTILITIES
# =============================================================================
def now_local() -> datetime.datetime:
return datetime.datetime.now(TZINFO)
def parse_time_to_local(t: str) -> datetime.datetime:
dt = parser.isoparse(t)
if dt.tzinfo is None:
return dt.replace(tzinfo=TZINFO)
return dt.astimezone(TZINFO)
def ensure_parent_dir(path: str) -> None:
parent = os.path.dirname(path)
if parent and not os.path.exists(parent):
os.makedirs(parent, exist_ok=True)
def read_text_file(path: str) -> str:
try:
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except FileNotFoundError:
return ""
except Exception as e:
LOGGER.debug("Error reading %s: %s", path, e)
return ""
def load_bot_token() -> str:
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
if tok:
return tok
tok = read_text_file(TOKEN_FILE_HOME)
if tok:
return tok
tok = read_text_file(TOKEN_FILE_ETC)
return tok.strip() if tok else ""
# =============================================================================
# STATE MANAGEMENT
# =============================================================================
def load_state() -> Dict:
default = {
"phase": "unknown", # "wakeup", "active", "shutdown", "dormant"
"last_check": None,
"soil_temp_history": [], # Lista di (date, temp_6cm)
"soil_moisture_history": [], # Lista di (date, moisture_3_9cm, moisture_9_27cm)
"high_moisture_streak": 0,
"auto_reporting_enabled": False,
"wakeup_threshold_reached": False,
"shutdown_confirmed": False,
"last_auto_report_date": None,
"wakeup_notified_for_month": None,
"last_irrigation_need": None,
# Modello a serbatoio (bucket) e grafico
"water_balance_mm": WATER_BALANCE_MAX_MM, # Bilancio idrico stimato (mm)
"last_balance_date": None, # Ultima data di aggiornamento bilancio (ISO)
"daily_history": [], # Ultimi 7 giorni: [{date, temp_6, moist_9_27, moist_27_81, et0, precip}]
"last_24h_precip_mm": None, # Pioggia ultime 24h (da sensore/API esterna; se None veto non applicato)
}
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
data = json.load(f) or {}
default.update(data)
except Exception as e:
LOGGER.exception("State read error: %s", e)
return default
def save_state(state: Dict) -> None:
try:
ensure_parent_dir(STATE_FILE)
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
except Exception as e:
LOGGER.exception("State write error: %s", e)
# =============================================================================
# OPEN-METEO API (doppia fonte: suolo ICON Seamless, meteo ICON Italia)
# =============================================================================
def fetch_soil_icon_seamless(lat: float, lon: float, timezone: str = TZ) -> Optional[Dict]:
"""
Recupera solo dati suolo e temperatura suolo da ICON Seamless (Open-Meteo).
Fornisce tutti i layer: 0-1, 1-3, 3-9, 9-27, 27-81 cm e T° 0, 6, 18, 54 cm (8 giorni).
"""
params = {
"latitude": lat,
"longitude": lon,
"timezone": timezone,
"forecast_days": 8,
"hourly": ",".join([
"soil_temperature_0cm",
"soil_temperature_6cm",
"soil_temperature_18cm",
"soil_temperature_54cm",
"soil_moisture_0_to_1cm",
"soil_moisture_1_to_3cm",
"soil_moisture_3_to_9cm",
"soil_moisture_9_to_27cm",
"soil_moisture_27_to_81cm",
]),
"models": MODEL_SOIL,
}
try:
r = open_meteo_get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=(5, 30))
if r.status_code != 200:
LOGGER.warning("Soil fetch %s: %s", r.status_code, r.text[:200])
return None
return r.json()
except Exception as e:
LOGGER.warning("Soil (ICON Seamless) request error: %s", e)
return None
def fetch_weather_icon_italia(lat: float, lon: float, timezone: str = TZ) -> Optional[Dict]:
"""
Recupera dati meteo da ICON Italia (risoluzione spaziale migliore per Italia/San Marino):
ET₀, precipitazioni, temperatura, radiazione, umidità, VPD, sunshine.
"""
params = {
"latitude": lat,
"longitude": lon,
"timezone": timezone,
"forecast_days": 10,
"hourly": ",".join([
"precipitation",
"snowfall",
"temperature_2m",
"relative_humidity_2m",
"et0_fao_evapotranspiration",
"vapour_pressure_deficit",
"direct_radiation",
"diffuse_radiation",
"shortwave_radiation",
"sunshine_duration",
]),
"daily": ",".join([
"precipitation_sum",
"snowfall_sum",
"et0_fao_evapotranspiration_sum",
"sunshine_duration",
]),
"models": MODEL_WEATHER,
}
try:
r = open_meteo_get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=(5, 30))
r.raise_for_status()
return r.json()
except Exception as e:
LOGGER.exception("Open-Meteo weather (ICON Italia) request error: %s", e)
return None
def _weather_params_common(lat: float, lon: float, timezone: str) -> Dict:
"""Parametri comuni hourly/daily per fetch meteo (ET₀, precipitazioni, ecc.)."""
return {
"latitude": lat,
"longitude": lon,
"timezone": timezone,
"hourly": ",".join([
"precipitation",
"snowfall",
"temperature_2m",
"relative_humidity_2m",
"et0_fao_evapotranspiration",
"vapour_pressure_deficit",
"direct_radiation",
"diffuse_radiation",
"shortwave_radiation",
"sunshine_duration",
]),
"daily": ",".join([
"precipitation_sum",
"snowfall_sum",
"et0_fao_evapotranspiration_sum",
"sunshine_duration",
]),
}
def fetch_weather_single_model(
lat: float, lon: float, timezone: str, model: str, forecast_days: int = 10
) -> Optional[Dict]:
"""
Recupera dati meteo per un singolo modello Open-Meteo (stessa struttura di fetch_weather_icon_italia).
"""
params = _weather_params_common(lat, lon, timezone)
params["forecast_days"] = forecast_days
params["models"] = model
try:
r = open_meteo_get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=(5, 30))
r.raise_for_status()
return r.json()
except Exception as e:
LOGGER.debug("Open-Meteo single model %s error: %s", model, e)
return None
def _median_or_single(values: List[Optional[float]]) -> Optional[float]:
"""Mediana dei valori numerici; ignora None. Se nessun valore valido, ritorna None."""
nums = [float(v) for v in values if v is not None]
if not nums:
return None
if len(nums) == 1:
return nums[0]
return _median(nums)
def _merge_daily_three_models_median(daily_list: List[Dict]) -> Dict:
"""
Unisce i daily di più risposte meteo: per ogni data (unione di tutte) calcola la mediana
di et0_fao_evapotranspiration_sum, precipitation_sum, snowfall_sum, sunshine_duration.
"""
time_idx: Dict[str, int] = {}
all_times: List[str] = []
for d in daily_list:
times = d.get("time", []) or []
for t in times:
key = str(t)[:10] if t else ""
if key and key not in time_idx:
time_idx[key] = len(all_times)
all_times.append(key)
if not all_times:
return {"time": [], "et0_fao_evapotranspiration_sum": [], "precipitation_sum": [], "snowfall_sum": [], "sunshine_duration": []}
# Per ogni data, indice in ogni daily
daily_keys = ["et0_fao_evapotranspiration_sum", "precipitation_sum", "snowfall_sum", "sunshine_duration"]
out: Dict[str, List] = {k: [] for k in daily_keys}
out["time"] = all_times
for date_str in all_times:
for key in daily_keys:
vals = []
for d in daily_list:
times = d.get("time", []) or []
arr = d.get(key, []) or []
for i, t in enumerate(times):
if str(t)[:10] == date_str and i < len(arr) and arr[i] is not None:
try:
vals.append(float(arr[i]))
except (TypeError, ValueError):
pass
break
out[key].append(_median_or_single(vals) if vals else None)
return out
def _merge_hourly_three_models_median(hourly_list: List[Dict]) -> Dict:
"""
Unisce gli hourly di più risposte: per ogni timestamp (unione) calcola la mediana
per ogni variabile numerica. Usa il primo dizionario per la lista dei nomi chiave.
"""
time_idx: Dict[str, int] = {}
all_times: List[str] = []
for h in hourly_list:
times = h.get("time", []) or []
for t in times:
k = _normalize_time_key(str(t)) if t else ""
if k and k not in time_idx:
time_idx[k] = len(all_times)
all_times.append(t if isinstance(t, str) else k)
if not all_times:
keys = [k for k in (list(hourly_list[0].keys()) if hourly_list else []) if k != "time"]
return {"time": [], **{k: [] for k in keys}}
keys = [k for k in (list(hourly_list[0].keys()) if hourly_list else []) if k != "time"]
out: Dict[str, List] = {"time": all_times}
for key in keys:
out[key] = []
for ref_t in all_times:
ref_k = _normalize_time_key(str(ref_t))
vals = []
for h in hourly_list:
times = h.get("time", []) or []
arr = h.get(key, []) or []
for i, t in enumerate(times):
if _normalize_time_key(str(t)) == ref_k and i < len(arr) and arr[i] is not None:
try:
vals.append(float(arr[i]))
except (TypeError, ValueError):
pass
break
out[key].append(_median_or_single(vals) if vals else None)
return out
def fetch_weather_three_models(lat: float, lon: float, timezone: str = TZ) -> Optional[Dict]:
"""
Recupera meteo da tre modelli (ICON Italia, ECMWF IFS, ARPEGE/Météo-France) e restituisce
un unico payload con daily e hourly ottenuti dalla mediana per ogni giorno/ora.
Vedi OPEN_METEO_MODELS.md per la motivazione (ARPEGE preferito a GFS per Europa/Italia).
"""
daily_list: List[Dict] = []
hourly_list: List[Dict] = []
meta = None
for model in WEATHER_MODELS_THREE:
fd = WEATHER_MODELS_FORECAST_DAYS.get(model, 10)
data = fetch_weather_single_model(lat, lon, timezone, model, forecast_days=fd)
if not data:
continue
if meta is None:
meta = {
"latitude": data.get("latitude"),
"longitude": data.get("longitude"),
"timezone": data.get("timezone"),
}
d = data.get("daily", {}) or {}
h = data.get("hourly", {}) or {}
if d.get("time"):
daily_list.append(d)
if h.get("time"):
hourly_list.append(h)
if not daily_list or meta is None:
LOGGER.warning("Three-model weather: no valid responses; fallback to single ICON Italia.")
return fetch_weather_icon_italia(lat, lon, timezone)
merged_daily = _merge_daily_three_models_median(daily_list)
merged_hourly = _merge_hourly_three_models_median(hourly_list) if hourly_list else {}
return {
"latitude": meta["latitude"],
"longitude": meta["longitude"],
"timezone": meta["timezone"],
"hourly": merged_hourly,
"daily": merged_daily,
}
def _normalize_time_key(t: str) -> str:
"""Normalizza timestamp per confronto (es. '2026-02-05T16:00' e '2026-02-05T16:00:00' → stesso key)."""
if not t or not isinstance(t, str):
return str(t) if t else ""
return t.strip()[:16] # YYYY-MM-DDTHH:MM
def _merge_hourly_by_time(soil_hourly: Dict, weather_hourly: Dict, weather_daily: Dict) -> Dict:
"""
Unisce hourly da soil (ICON Seamless, es. 8 giorni) e weather (ICON Italia, es. 10 giorni).
Usa i tempi del SOIL come riferimento così i layer 9-27 e 27-81 non si perdono; allinea
il meteo a questi tempi. Se il soil non ha 'time', fallback su weather come riferimento.
"""
soil_times = soil_hourly.get("time", []) or []
weather_times = weather_hourly.get("time", []) or []
if not soil_times and not weather_times:
return weather_hourly
# Riferimento: soil se disponibile (preserva 8 giorni e tutti i layer suolo)
use_soil_as_ref = len(soil_times) > 0
ref_times = soil_times if use_soil_as_ref else weather_times
out = {"time": ref_times}
# Indice weather per ogni timestamp (normalizzato per evitare mismatch formato)
weather_idx_by_key = {}
for i, t in enumerate(weather_times):
k = _normalize_time_key(t)
if k and k not in weather_idx_by_key:
weather_idx_by_key[k] = i
soil_idx_by_key = {}
for i, t in enumerate(soil_times):
k = _normalize_time_key(t)
if k and k not in soil_idx_by_key:
soil_idx_by_key[k] = i
# Copia serie soil: allineate per ref_times (se ref=soil, copia diretta; altrimenti lookup)
for key, values in soil_hourly.items():
if key == "time":
continue
if use_soil_as_ref and values is not None and len(values) == len(ref_times):
out[key] = list(values)
else:
out[key] = []
for i, t in enumerate(ref_times):
k = _normalize_time_key(t)
idx = soil_idx_by_key.get(k, i) if soil_times else i
if values and idx < len(values) and values[idx] is not None:
out[key].append(values[idx])
else:
out[key].append(None)
# Copia serie weather allineate a ref_times
for key, values in weather_hourly.items():
if key == "time":
continue
if key in out:
continue
out[key] = []
for i, t in enumerate(ref_times):
k = _normalize_time_key(t)
idx = weather_idx_by_key.get(k, i) if weather_times else i
if values and idx < len(values) and values[idx] is not None:
out[key].append(values[idx])
else:
out[key].append(None)
return out
def fetch_soil_and_weather(lat: float, lon: float, timezone: str = TZ) -> Optional[Dict]:
"""
Recupera dati combinati: suolo da ICON Seamless (tutti i layer), meteo da analisi
a tre modelli (ICON Italia + ECMWF IFS + ARPEGE) con mediana di ET₀ e precipitazioni.
In caso di fallimento suolo, prova fallback con singola fonte (solo ICON Italia).
"""
soil_data = fetch_soil_icon_seamless(lat, lon, timezone)
weather_data = fetch_weather_three_models(lat, lon, timezone)
if not weather_data:
return None
hourly_w = weather_data.get("hourly", {}) or {}
daily_w = weather_data.get("daily", {}) or {}
if not soil_data or not soil_data.get("hourly"):
# Fallback: chiedi a ICON Italia anche i parametri suolo (meno layer)
LOGGER.info("Soil da ICON Seamless non disponibile; uso solo ICON Italia.")
return fetch_soil_and_weather_fallback(lat, lon, timezone)
hourly_s = soil_data.get("hourly", {}) or {}
hourly_merged = _merge_hourly_by_time(hourly_s, hourly_w, daily_w)
return {
"latitude": weather_data.get("latitude"),
"longitude": weather_data.get("longitude"),
"timezone": weather_data.get("timezone"),
"hourly": hourly_merged,
"daily": daily_w,
}
def fetch_soil_and_weather_fallback(lat: float, lon: float, timezone: str = TZ) -> Optional[Dict]:
"""Fallback: singola chiamata a ICON Italia con suolo (layer limitati) + meteo."""
params = {
"latitude": lat,
"longitude": lon,
"timezone": timezone,
"forecast_days": 10,
"hourly": ",".join([
"soil_temperature_0cm",
"soil_temperature_6cm",
"soil_temperature_18cm",
"soil_temperature_54cm",
"soil_moisture_0_to_1cm",
"soil_moisture_3_to_9cm",
"soil_moisture_9_to_27cm",
"soil_moisture_27_to_81cm",
"precipitation",
"snowfall",
"temperature_2m",
"relative_humidity_2m",
"et0_fao_evapotranspiration",
"vapour_pressure_deficit",
"shortwave_radiation",
"sunshine_duration",
]),
"daily": ",".join([
"precipitation_sum",
"snowfall_sum",
"et0_fao_evapotranspiration_sum",
"sunshine_duration",
]),
"models": MODEL_WEATHER,
}
try:
r = open_meteo_get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=(5, 30))
if r.status_code == 400:
return None
r.raise_for_status()
return r.json()
except Exception as e:
LOGGER.exception("Fallback fetch error: %s", e)
return None
def fetch_weather_only(lat: float, lon: float, timezone: str = TZ) -> Optional[Dict]:
"""Recupera solo dati meteo (senza suolo): analisi a tre modelli con mediana."""
return fetch_weather_three_models(lat, lon, timezone)
# =============================================================================
# SEASONAL PHASE DETECTION
# =============================================================================
def determine_seasonal_phase(
month: int,
soil_temp_6cm: Optional[float],
soil_moisture_3_9cm: Optional[float],
soil_moisture_9_27cm: Optional[float],
state: Dict
) -> str:
"""
Determina la fase stagionale: "wakeup", "active", "shutdown", "dormant"
"""
# Primavera (Marzo-Maggio): fase risveglio o attiva
if month in [3, 4, 5]:
# Se temperatura suolo > soglia per X giorni consecutivi -> attiva
# Altrimenti -> wakeup
if soil_temp_6cm is not None and soil_temp_6cm >= SOIL_TEMP_WAKEUP_THRESHOLD:
# Verifica persistenza (dai dati storici o corrente)
temp_history = state.get("soil_temp_history", [])
recent_warm_days = 0
now = now_local()
for date_str, temp in temp_history:
try:
date_obj = datetime.datetime.fromisoformat(date_str).replace(tzinfo=TZINFO)
days_ago = (now - date_obj).days
if days_ago <= SOIL_TEMP_WAKEUP_DAYS_MAX and temp >= SOIL_TEMP_WAKEUP_THRESHOLD:
recent_warm_days += 1
except Exception:
continue
if recent_warm_days >= SOIL_TEMP_WAKEUP_DAYS_MIN - 1 or soil_temp_6cm >= SOIL_TEMP_WAKEUP_THRESHOLD:
return "active"
else:
return "wakeup"
else:
return "wakeup"
# Estate (Giugno-Agosto): sempre attiva
elif month in [6, 7, 8]:
return "active"
# Autunno (Settembre-Novembre): attiva o shutdown
elif month in [9, 10, 11]:
if soil_temp_6cm is not None and soil_temp_6cm < SOIL_TEMP_SHUTDOWN_THRESHOLD:
return "shutdown"
elif (soil_moisture_9_27cm is not None and
soil_moisture_9_27cm >= SOIL_MOISTURE_AUTUMN_HIGH):
# Verifica giorni consecutivi con umidità alta
high_streak = state.get("high_moisture_streak", 0)
if high_streak >= AUTUMN_HIGH_MOISTURE_DAYS:
return "shutdown"
return "active"
# Inverno (Dicembre-Febbraio): dormiente
else:
return "dormant"
# =============================================================================
# CLASSIFICATION HELPERS
# =============================================================================
def classify_et0(et0: float) -> str:
"""Classifica ET₀ in basso, medio/basso, medio, medio/alto, alto"""
if et0 < ET0_LOW:
return "basso"
elif et0 < ET0_MEDIUM_LOW:
return "medio/basso"
elif et0 < ET0_MEDIUM_HIGH:
return "medio"
elif et0 < 7.0:
return "medio/alto"
else:
return "alto"
def classify_soil_temp(temp: float) -> str:
"""Classifica temperatura suolo in basso, medio/basso, medio, medio/alto, alto"""
if temp < SOIL_TEMP_LOW:
return "basso"
elif temp < SOIL_TEMP_MEDIUM_LOW:
return "medio/basso"
elif temp < SOIL_TEMP_MEDIUM_HIGH:
return "medio"
elif temp < 20.0:
return "medio/alto"
else:
return "alto"
def classify_soil_moisture(moisture: float) -> str:
"""
Classifica umidità suolo (VWC) in base al tipo di suolo configurato.
Usa PWP, trigger irrigazione e capacità di campo: così 39% su clay loam = "alto" (pieno), non "medio/basso".
"""
if moisture < SOIL_MOISTURE_WILTING_POINT:
return "basso"
if moisture < SOIL_MOISTURE_DEEP_STRESS:
return "medio/basso"
if moisture < SOIL_MOISTURE_FIELD_CAPACITY * 0.95:
return "medio"
if moisture < SOIL_MOISTURE_FIELD_CAPACITY:
return "medio/alto"
return "alto"
def classify_vpd(vpd: float) -> str:
"""Classifica VPD in basso, medio/basso, medio, medio/alto, alto"""
if vpd < VPD_LOW:
return "basso"
elif vpd < VPD_MEDIUM_LOW:
return "medio/basso"
elif vpd < VPD_MEDIUM_HIGH:
return "medio"
elif vpd < 1.8:
return "medio/alto"
else:
return "alto"
def classify_sunshine(hours: float) -> str:
"""Classifica ore di sole in basso, medio/basso, medio, medio/alto, alto"""
if hours < SUNSHINE_LOW:
return "basso"
elif hours < SUNSHINE_MEDIUM_LOW:
return "medio/basso"
elif hours < SUNSHINE_MEDIUM_HIGH:
return "medio"
elif hours < 10.0:
return "medio/alto"
else:
return "alto"
def classify_precip_daily(precip: float) -> str:
"""Classifica precipitazione giornaliera in basso, medio/basso, medio, medio/alto, alto"""
if precip < PRECIP_DAILY_LOW:
return "basso"
elif precip < PRECIP_DAILY_MEDIUM_LOW:
return "medio/basso"
elif precip < PRECIP_DAILY_MEDIUM_HIGH:
return "medio"
elif precip < 30.0:
return "medio/alto"
else:
return "alto"
# =============================================================================
# IRRIGATION LOGIC
# =============================================================================
def calculate_water_stress_index(
moisture_3_9cm: Optional[float],
moisture_9_27cm: Optional[float],
vpd_avg: Optional[float] = None
) -> Tuple[float, str]:
"""
Calcola Indice di Stress Idrico (0-100%) usando umidità suolo e VPD.
VPD (Vapour Pressure Deficit) è un ottimo indicatore di stress idrico:
- VPD alto (>1.5 kPa) = stress idrico elevato
- VPD medio (0.8-1.5 kPa) = stress moderato
- VPD basso (<0.8 kPa) = condizioni ottimali
Returns: (index, level_description)
"""
if moisture_3_9cm is None and moisture_9_27cm is None:
# Se non abbiamo dati umidità, usa solo VPD se disponibile
if vpd_avg is not None:
if vpd_avg > 1.5:
return 85.0, "ROSSO_VPD"
elif vpd_avg > 1.0:
return 60.0, "ARANCIONE_VPD"
elif vpd_avg > 0.8:
return 30.0, "GIALLO_VPD"
else:
return 10.0, "VERDE_VPD"
return 50.0, "UNKNOWN" # Dati non disponibili
# Usa media pesata (superficie più importante)
if moisture_3_9cm is not None and moisture_9_27cm is not None:
effective_moisture = 0.6 * moisture_3_9cm + 0.4 * moisture_9_27cm
elif moisture_3_9cm is not None:
effective_moisture = moisture_3_9cm
else:
effective_moisture = moisture_9_27cm
# Calcola indice base rispetto a capacità di campo
if effective_moisture >= SOIL_MOISTURE_FIELD_CAPACITY:
index_base = 0.0
level = "VERDE"
elif effective_moisture <= SOIL_MOISTURE_WILTING_POINT:
index_base = 100.0
level = "ROSSO"
else:
# Interpolazione lineare
range_moisture = SOIL_MOISTURE_FIELD_CAPACITY - SOIL_MOISTURE_WILTING_POINT
deficit = SOIL_MOISTURE_FIELD_CAPACITY - effective_moisture
index_base = (deficit / range_moisture) * 100.0
if index_base >= 70:
level = "ARANCIONE"
elif index_base >= 40:
level = "GIALLO"
else:
level = "VERDE"
# Aggiusta indice usando VPD se disponibile
# VPD alto aumenta lo stress percepito, VPD basso lo riduce
final_index = index_base
if vpd_avg is not None:
vpd_factor = 1.0
if vpd_avg > 1.5:
vpd_factor = 1.3 # Aumenta stress del 30% se VPD molto alto
elif vpd_avg > 1.0:
vpd_factor = 1.15 # Aumenta stress del 15%
elif vpd_avg < 0.8:
vpd_factor = 0.9 # Riduce stress del 10% se VPD basso
final_index = min(100.0, index_base * vpd_factor)
# Aggiorna livello se VPD modifica significativamente l'indice
if vpd_avg > 1.5 and level != "ROSSO":
if final_index >= 70:
level = "ARANCIONE_VPD"
if final_index >= 85:
level = "ROSSO_VPD"
elif vpd_avg < 0.8 and index_base > 40:
if final_index < 40:
level = "GIALLO_VPD"
return final_index, level
def check_future_rainfall(daily_data: Dict, days_ahead: int = 5, include_snowfall: bool = True) -> Tuple[float, List[str]]:
"""
Controlla pioggia prevista nei prossimi giorni usando precipitation_sum.
precipitation_sum include già pioggia, neve e temporali (è la somma totale).
Returns: (total_mm, list_of_days_with_precip)
"""
daily_times = daily_data.get("time", []) or []
# Usa precipitation_sum che include già pioggia, neve e temporali
daily_precip = daily_data.get("precipitation_sum", []) or []
daily_snowfall = daily_data.get("snowfall_sum", []) or [] # Solo per indicare se c'è neve
total = 0.0
rainy_days = []
now = now_local()
for i, time_str in enumerate(daily_times[:days_ahead]):
try:
day_time = parse_time_to_local(time_str)
if day_time.date() <= now.date():
continue # Salta giorni passati
# precipitation_sum include già tutto (pioggia + neve + temporali)
precip = float(daily_precip[i]) if i < len(daily_precip) and daily_precip[i] is not None else 0.0
snow = float(daily_snowfall[i]) if (include_snowfall and i < len(daily_snowfall) and daily_snowfall[i] is not None) else 0.0
# Usa solo precipitation_sum (non sommare snowfall separatamente, è già incluso)
total_precip = precip
if total_precip > 0.1: # Almeno 0.1 mm
total += total_precip
if snow > 0.5: # Se c'è neve significativa
rainy_days.append(f"{day_time.strftime('%d/%m')} ({total_precip:.1f}mm, di cui {snow/10:.1f}cm neve)")
else:
rainy_days.append(f"{day_time.strftime('%d/%m')} ({precip:.1f}mm)")
except Exception:
continue
return total, rainy_days
def _irrigation_need_index(
heart_moisture: Optional[float],
et0_avg: Optional[float],
vpd_avg: Optional[float] = None
) -> float:
"""
Indice 0-100 di necessità di irrigazione (alta = serve acqua).
Usato per rilevare variazioni significative e decidere invio notifiche.
"""
if heart_moisture is None and et0_avg is None:
return 50.0
# Fattore umidità: bassa umidità = alto bisogno
m = heart_moisture if heart_moisture is not None else 0.5
moisture_factor = max(0, 1.0 - m) # 0 se saturo, 1 se secco
# Fattore ET0: alto ET0 = alto bisogno (normalizzato ~0-6 mm/d)
e = et0_avg if et0_avg is not None else 3.0
et0_factor = min(1.0, e / 6.0)
need = (moisture_factor * 50.0 + et0_factor * 50.0)
if vpd_avg is not None and vpd_avg > 1.0:
need = min(100.0, need * 1.1)
return round(need, 1)
def _will_exit_dormant_in_forecast(hourly: Dict, times: List[str], now: datetime.datetime) -> bool:
"""
True se nei prossimi 10 giorni la temperatura suolo prevista supera la soglia di risveglio
(uscita dallo stato dormiente invernale).
"""
soil_temps = hourly.get("soil_temperature_6cm", []) or hourly.get("soil_temperature_0cm", []) or []
if not times or not soil_temps:
return False
# Raggruppa per giorno e calcola media T suolo per ogni giorno
day_temps: Dict[str, List[float]] = {}
for i, t_str in enumerate(times[: min(10 * 24, len(times))]):
try:
t = parse_time_to_local(t_str)
if t.date() <= now.date():
continue
key = t.date().isoformat()
if key not in day_temps:
day_temps[key] = []
if i < len(soil_temps) and soil_temps[i] is not None:
try:
day_temps[key].append(float(soil_temps[i]))
except (TypeError, ValueError):
pass
except Exception:
continue
for _date_str, vals in day_temps.items():
if vals and (sum(vals) / len(vals)) >= SOIL_TEMP_WAKEUP_THRESHOLD:
return True
return False
def _irrigation_need_next_days(
daily: Dict,
current_heart_moisture: Optional[float],
days: int = 3
) -> List[float]:
"""
Stima indice di necessità irrigazione (0-100) per i prossimi giorni
usando ET0 e precipitazioni giornaliere (umidità stimata per giorno).
"""
daily_times = daily.get("time", []) or []
et0_sum = daily.get("et0_fao_evapotranspiration_sum", []) or []
precip_sum = daily.get("precipitation_sum", []) or []
now = now_local()
need_list = []
moisture = current_heart_moisture if current_heart_moisture is not None else 0.5
for i in range(len(daily_times)):
if len(need_list) >= days:
break
try:
day_time = parse_time_to_local(daily_times[i])
if day_time.date() <= now.date():
continue
except Exception:
continue
et0 = float(et0_sum[i]) if i < len(et0_sum) and et0_sum[i] is not None else 3.0
precip = float(precip_sum[i]) if i < len(precip_sum) and precip_sum[i] is not None else 0.0
moisture = moisture - et0 * 0.03 + precip * 0.04
moisture = max(0.2, min(0.9, moisture))
need_list.append(_irrigation_need_index(moisture, et0, None))
return need_list
# =============================================================================
# ADVICE GENERATION
# =============================================================================
def generate_wakeup_advice(
soil_temp_6cm: Optional[float],
soil_moisture_3_9cm: Optional[float],
soil_moisture_9_27cm: Optional[float],
future_rain_mm: float,
rainy_days: List[str],
shortwave_avg: Optional[float] = None,
sunshine_hours: Optional[float] = None,
state: Optional[Dict] = None
) -> Dict:
"""
FASE RISVEGLIO: "Quando accendere?"
Trigger: Termico + Energetico + Fotoperiodo
Returns: Dict con season_phase, advice_level, human_message, soil_status_summary
"""
state = state or {}
status = "**Fase: Risveglio Primaverile**"
# TRIGGER 1: Soglia Termica - Soil Temperature (6cm) > 10°C per 3-5 giorni consecutivi
temp_ok = False
temp_avg_24h = None
if soil_temp_6cm is not None:
# Calcola media 24h (se disponibile storico, altrimenti usa valore corrente)
temp_history = state.get("soil_temp_history", [])
now = now_local()
recent_temps = []
for date_str, temp in temp_history:
try:
date_obj = datetime.datetime.fromisoformat(date_str).replace(tzinfo=TZINFO)
days_ago = (now - date_obj).days
if days_ago <= 7: # Ultimi 7 giorni
recent_temps.append(temp)
except Exception:
continue
recent_temps.append(soil_temp_6cm)
if recent_temps:
temp_avg_24h = sum(recent_temps) / len(recent_temps)
# Verifica giorni consecutivi sopra soglia
warm_days = 0
for date_str, temp in temp_history:
try:
date_obj = datetime.datetime.fromisoformat(date_str).replace(tzinfo=TZINFO)
days_ago = (now - date_obj).days
if days_ago <= SOIL_TEMP_WAKEUP_DAYS_MAX and temp >= SOIL_TEMP_WAKEUP_THRESHOLD:
warm_days += 1
except Exception:
continue
if soil_temp_6cm >= SOIL_TEMP_WAKEUP_THRESHOLD:
warm_days += 1
temp_ok = (temp_avg_24h is not None and temp_avg_24h >= SOIL_TEMP_WAKEUP_THRESHOLD and
warm_days >= SOIL_TEMP_WAKEUP_DAYS_MIN)
# TRIGGER 2: Energetico - Shortwave Radiation GHI in crescita
energy_ok = False
if shortwave_avg is not None:
# Verifica se GHI mostra trend positivo (semplificato: > 150 W/m² indica buon irraggiamento)
energy_ok = shortwave_avg > 150.0
# TRIGGER 3: Fotoperiodo - Sunshine Duration in aumento
photoperiod_ok = False
if sunshine_hours is not None:
# Fotoperiodo adeguato per risveglio (almeno 6-7 ore di sole)
photoperiod_ok = sunshine_hours >= 6.0
# Trigger combinati: almeno 2 su 3 devono essere OK (termico è obbligatorio)
triggers_active = temp_ok and (energy_ok or photoperiod_ok)
# Umidità profonda sotto trigger irrigazione (sotto FC = "serbatoio vuoto" → serve acqua)
# Per argilla: trigger 28%; sotto 10°C si ignora (dormienza, rischio marciumi/ghiaccio)
moisture_deep_low = False
if soil_moisture_9_27cm is not None:
moisture_deep_low = soil_moisture_9_27cm < SOIL_MOISTURE_DEEP_STRESS
# Genera consiglio
if triggers_active and moisture_deep_low:
advice_level = "CRITICAL"
advice_msg = "🌱 **SVEGLIA IL SISTEMA**\n\n"
advice_msg += "Tutti i trigger di risveglio sono attivi:\n"
if temp_ok:
advice_msg += f"• Temperatura suolo stabile ≥{SOIL_TEMP_WAKEUP_THRESHOLD}°C\n"
if energy_ok:
advice_msg += f"• Irraggiamento solare adeguato ({shortwave_avg:.0f} W/m²)\n"
if photoperiod_ok:
advice_msg += f"• Fotoperiodo sufficiente ({sunshine_hours:.1f}h di sole)\n"
advice_msg += f"\nIl terreno profondo (9-27cm) sotto trigger irrigazione ({soil_moisture_9_27cm*100:.0f}% < {SOIL_MOISTURE_DEEP_STRESS*100:.0f}%). "
if future_rain_mm < PRECIP_THRESHOLD_SIGNIFICANT:
advice_msg += "Nessuna pioggia significativa prevista.\n\n"
else:
advice_msg += f"Pioggia prevista: {future_rain_mm:.1f}mm nei prossimi giorni.\n\n"
advice_msg += "**Consigliato**: Primo ciclo di test/attivazione dell'impianto di irrigazione.\n\n"
advice_msg += f"**Quando innaffiare le prime volte**: Quando T° suolo ≥10°C e umidità 9-27cm scende sotto ~{SOIL_MOISTURE_DEEP_STRESS*100:.0f}% (trigger), irriga abbondantemente fino a ~{SOIL_MOISTURE_FIELD_CAPACITY*100:.0f}% (capacità di campo). Con argilla: cicli lunghi e lenti, meno frequenti. Sotto 10°C: blocco totale, non irrigare (dormienza, rischio marciumi/ghiaccio)."
elif not temp_ok:
advice_level = "NO_ACTION"
advice_msg = "💤 **DORMI ANCORA**\n\n"
if soil_temp_6cm is not None:
advice_msg += f"Trigger termico non soddisfatto: temperatura suolo {soil_temp_6cm:.1f}°C < {SOIL_TEMP_WAKEUP_THRESHOLD}°C (soglia risveglio). "
else:
advice_msg += "Temperatura suolo non disponibile. "
advice_msg += "Le piante sono ancora in riposo vegetativo. Attendi che il terreno si scaldi stabilmente."
elif not moisture_deep_low:
advice_level = "NO_ACTION"
advice_msg = "💤 **DORMI ANCORA**\n\n"
if soil_moisture_9_27cm is not None:
advice_msg += f"Terreno profondo (9-27cm) ancora sufficientemente umido ({soil_moisture_9_27cm*100:.0f}%). "
advice_msg += "Nessuna necessità di irrigazione al momento."
else:
advice_level = "NO_ACTION"
advice_msg = "💤 **DORMI ANCORA**\n\n"
advice_msg += "Trigger energetici o fotoperiodo non ancora sufficienti. "
if future_rain_mm >= PRECIP_THRESHOLD_SIGNIFICANT:
advice_msg += f"Pioggia prevista: {future_rain_mm:.1f}mm nei prossimi giorni. "
if rainy_days:
advice_msg += f"Giorni: {', '.join(rainy_days)}.\n\n"
advice_msg += "Attendi condizioni più favorevoli prima di attivare l'impianto."
# Soil status summary
soil_summary_parts = []
if soil_temp_6cm is not None:
soil_summary_parts.append(f"T.Suolo 6cm: {soil_temp_6cm:.1f}°C")
if soil_moisture_9_27cm is not None:
soil_summary_parts.append(f"Umidità Radici (9-27cm): {soil_moisture_9_27cm*100:.0f}%")
soil_status_summary = " | ".join(soil_summary_parts) if soil_summary_parts else "Dati suolo non disponibili"
return {
"season_phase": "AWAKENING",
"advice_level": advice_level,
"human_message": advice_msg,
"soil_status_summary": soil_status_summary,
"status_display": status
}
def generate_active_advice(
soil_moisture_0_1cm: Optional[float],
soil_moisture_3_9cm: Optional[float],
soil_moisture_9_27cm: Optional[float],
soil_moisture_27_81cm: Optional[float], # Riserva profonda (se disponibile)
future_rain_mm: float,
rainy_days: List[str],
et0_avg: Optional[float],
next_2_days_rain: float,
vpd_avg: Optional[float] = None
) -> Dict:
"""
FASE ATTIVA: "Quanto irrigare?"
Logica allineata a "A Guide to Soil Moisture" (ConnectedCrops/OMAFRA):
- Zona ideale: tra PAW (50% TAW) e FC. Sotto PAW → stress, sopra FC → saturazione/percolazione.
- Cuore (3-9 + 9-27cm) e Riserva (27-81cm); ignora 0-1cm (troppo variabile).
"""
status = "**Fase: Piena Stagione (Primavera/Estate)**"
# Analisi stratificata - ignora fluttuazioni superficiali (0-1cm)
# Calcola media ponderata del "Cuore" del sistema (3-9cm e 9-27cm)
heart_moisture = None
if soil_moisture_3_9cm is not None and soil_moisture_9_27cm is not None:
# Media ponderata: 9-27cm più importante (60%)
heart_moisture = 0.4 * soil_moisture_3_9cm + 0.6 * soil_moisture_9_27cm
elif soil_moisture_9_27cm is not None:
heart_moisture = soil_moisture_9_27cm
elif soil_moisture_3_9cm is not None:
heart_moisture = soil_moisture_3_9cm
# Monitora la "Riserva" profonda (27-81cm) - sotto capacità di campo = allarme
reserve_depleting = False
if soil_moisture_27_81cm is not None:
reserve_depleting = soil_moisture_27_81cm < SOIL_MOISTURE_FIELD_CAPACITY
# Calcola fabbisogno idrico basato su ET₀
daily_water_demand = et0_avg if et0_avg is not None else 0.0
estimated_deficit = daily_water_demand * 2.0 # Fabbisogno stimato 2 giorni (approssimativo)
# Confronta con precipitazioni previste
rain_covers_demand = next_2_days_rain > estimated_deficit
# LOGIC DECISIONALE - 4 livelli
# 🔴 CRITICO (Deep Stress)
is_critical = False
if heart_moisture is not None:
# Umidità 9-27cm vicina al punto di avvizzimento O Riserva in calo
if (soil_moisture_9_27cm is not None and
soil_moisture_9_27cm <= SOIL_MOISTURE_DEEP_STRESS):
is_critical = True
elif reserve_depleting:
is_critical = True
if is_critical and daily_water_demand > 3.0 and future_rain_mm < PRECIP_THRESHOLD_SIGNIFICANT:
advice_level = "CRITICAL"
advice_msg = "🔴 **LIVELLO CRITICO (Deep Stress)**\n\n"
if soil_moisture_9_27cm is not None and soil_moisture_9_27cm <= SOIL_MOISTURE_DEEP_STRESS:
advice_msg += f"Umidità profonda (9-27cm) sotto trigger ({soil_moisture_9_27cm*100:.0f}% ≤ {SOIL_MOISTURE_DEEP_STRESS*100:.0f}%). "
if reserve_depleting:
advice_msg += f"Riserva profonda (27-81cm) in calo: {soil_moisture_27_81cm*100:.0f}%. "
advice_msg += f"ET₀ elevato ({daily_water_demand:.1f} mm/d). Nessuna pioggia prevista.\n\n"
advice_msg += "**Emergenza**: Irrigazione profonda necessaria **subito**. Portare umidità verso capacità di campo (~{:.0f}%) senza superarla (evitare saturazione e percolazione). Con argilla: ciclo lungo e lento, niente brevi rinfrescate.".format(SOIL_MOISTURE_FIELD_CAPACITY * 100)
# 🟠 STANDARD (Maintenance) - tra trigger e capacità di campo
elif (heart_moisture is not None and
heart_moisture < SOIL_MOISTURE_FIELD_CAPACITY * 0.8 and
(soil_moisture_9_27cm is None or soil_moisture_9_27cm > SOIL_MOISTURE_DEEP_STRESS)):
advice_level = "STANDARD"
advice_msg = "🟠 **LIVELLO STANDARD (Maintenance)**\n\n"
advice_msg += "Umidità in calo verso il trigger, profondo (9-27cm) ancora ok. "
if et0_avg is not None:
advice_msg += f"ET₀ moderato ({et0_avg:.1f} mm/d). "
if rain_covers_demand:
advice_msg += f"Pioggia prevista domani/dopodomani ({next_2_days_rain:.1f}mm) dovrebbe coprire il fabbisogno.\n\n"
advice_msg += "**Consiglio**: Attendi le precipitazioni, poi valuta."
else:
advice_msg += "Nessuna pioggia sufficiente prevista a breve.\n\n"
advice_msg += "**Routine**: Ciclo lungo e lento consigliato (argilla: bagnare in profondità, meno frequente)."
# 🟡 LIGHT (Surface Dry) - su argilla 0-3cm si secca e crepa; non indicativo, evitare brevi rinfrescate
elif (soil_moisture_0_1cm is not None and soil_moisture_0_1cm < 0.5 and
heart_moisture is not None and heart_moisture >= SOIL_MOISTURE_FIELD_CAPACITY * 0.8):
advice_level = "LIGHT"
advice_msg = "🟡 **LIVELLO LIGHT (Surface Dry)**\n\n"
advice_msg += "Solo strati superficiali (0-3cm) secchi, radici profonde (9-27cm) ok. "
if SOIL_IS_CLAY:
advice_msg += "Su argilla lo strato superficiale si secca e crepa in fretta: non è indicativo. "
if future_rain_mm >= PRECIP_THRESHOLD_SIGNIFICANT:
advice_msg += f"Pioggia prevista: {future_rain_mm:.1f}mm.\n\n"
advice_msg += "**Opzionale**: Attendere precipitazioni." + (" Con argilla evitare brevi rinfrescate (preferire cicli lunghi quando serve)." if SOIL_IS_CLAY else " Breve rinfrescata o attendi precipitazioni.")
else:
advice_msg += "\n\n**Opzionale**: " + ("Attendi prossimo ciclo completo (argilla: niente brevi rinfrescate)." if SOIL_IS_CLAY else "Breve rinfrescata superficiale o attendi domani.")
# 🟢 STOP (a capacità di campo o oltre, oppure pioggia copre fabbisogno)
else:
advice_level = "NO_ACTION"
advice_msg = "🟢 **LIVELLO STOP (Saturated/Rain)**\n\n"
if heart_moisture is not None and heart_moisture >= SOIL_MOISTURE_FIELD_CAPACITY * 0.9:
advice_msg += "Terreno a capacità di campo o oltre (evitare saturazione: perdita nutrienti, asfissia radicale). "
if rain_covers_demand or future_rain_mm >= PRECIP_THRESHOLD_SIGNIFICANT:
advice_msg += f"Pioggia prevista ({future_rain_mm:.1f}mm) > fabbisogno calcolato ({estimated_deficit:.1f}mm). "
if rainy_days:
advice_msg += f"Giorni: {', '.join(rainy_days)}. "
advice_msg += "\n\n**Stop**: Non irrigare, ci pensa la natura."
# Soil status summary
soil_summary_parts = []
if soil_moisture_3_9cm is not None:
soil_summary_parts.append(f"Umidità 3-9cm: {soil_moisture_3_9cm*100:.0f}%")
if soil_moisture_9_27cm is not None:
soil_summary_parts.append(f"Umidità 9-27cm: {soil_moisture_9_27cm*100:.0f}%")
if soil_moisture_27_81cm is not None:
soil_summary_parts.append(f"Riserva 27-81cm: {soil_moisture_27_81cm*100:.0f}%")
if et0_avg is not None:
soil_summary_parts.append(f"ET₀: {et0_avg:.1f}mm/d")
soil_status_summary = " | ".join(soil_summary_parts) if soil_summary_parts else "Dati limitati"
return {
"season_phase": "ACTIVE",
"advice_level": advice_level,
"human_message": advice_msg,
"soil_status_summary": soil_status_summary,
"status_display": status
}
def generate_shutdown_advice(
soil_temp_6cm: Optional[float],
soil_moisture_9_27cm: Optional[float],
high_moisture_streak: int,
sunshine_hours: Optional[float] = None,
shortwave_avg: Optional[float] = None,
state: Optional[Dict] = None
) -> Dict:
"""
FASE CHIUSURA: "Quando spegnere?"
Trigger: Crollo Termico + Segnale Luce + Saturazione
Returns: Dict con season_phase, advice_level, human_message, soil_status_summary
"""
state = state or {}
status = "**Fase: Chiusura Autunnale**"
# TRIGGER 1: Crollo Termico - Soil Temperature (6cm) < 10°C stabilmente
temp_below = False
if soil_temp_6cm is not None:
temp_below = soil_temp_6cm < SOIL_TEMP_SHUTDOWN_THRESHOLD
# Verifica se è stabile (controlla storico)
temp_history = state.get("soil_temp_history", [])
now = now_local()
recent_below_count = 0
for date_str, temp in temp_history:
try:
date_obj = datetime.datetime.fromisoformat(date_str).replace(tzinfo=TZINFO)
days_ago = (now - date_obj).days
if days_ago <= 3 and temp < SOIL_TEMP_SHUTDOWN_THRESHOLD:
recent_below_count += 1
except Exception:
continue
if temp_below:
recent_below_count += 1
temp_below = recent_below_count >= 2 # Almeno 2 giorni consecutivi
# TRIGGER 2: Segnale Luce - Sunshine Duration in calo drastico
light_declining = False
if sunshine_hours is not None:
# Fotoperiodo sotto 6 ore indica calo drastico (inizio dormienza)
light_declining = sunshine_hours < 6.0
# TRIGGER 3: Saturazione - Soil Moisture (9-27cm) alta costantemente
saturation_ok = False
if (soil_moisture_9_27cm is not None and
soil_moisture_9_27cm >= SOIL_MOISTURE_AUTUMN_HIGH and
high_moisture_streak >= AUTUMN_HIGH_MOISTURE_DAYS):
saturation_ok = True
# Genera consiglio
if temp_below or (light_declining and saturation_ok):
advice_level = "NO_ACTION"
advice_msg = "❄️ **CESSATA NECESSITÀ DI IRRIGAZIONE**\n\n"
advice_msg += "Precipitazioni e temperature in calo rendono superfluo irrigare. "
if temp_below:
advice_msg += f"Temperatura suolo {soil_temp_6cm:.1f}°C < {SOIL_TEMP_SHUTDOWN_THRESHOLD}°C stabilmente. "
if light_declining:
advice_msg += f"Fotoperiodo in calo ({sunshine_hours:.1f}h di sole). "
if saturation_ok:
advice_msg += f"Umidità alta ({soil_moisture_9_27cm*100:.0f}%) da {high_moisture_streak} giorni. "
advice_msg += "\n\nLe piante sono in riposo vegetativo. **Puoi spegnere e svuotare l'impianto di irrigazione per l'inverno.** Il terreno non richiederà più irrigazione artificiale fino alla prossima primavera."
else:
advice_level = "STANDARD"
advice_msg = "🟡 **MONITORAGGIO CHIUSURA**\n\n"
advice_msg += "Stagione autunnale avanzata. Monitora attentamente:\n"
if soil_temp_6cm is not None:
advice_msg += f"• Temperatura suolo: {soil_temp_6cm:.1f}°C (soglia: {SOIL_TEMP_SHUTDOWN_THRESHOLD}°C)\n"
if sunshine_hours is not None:
advice_msg += f"• Fotoperiodo: {sunshine_hours:.1f}h (calo drastico se < 6h)\n"
if soil_moisture_9_27cm is not None:
advice_msg += f"• Umidità: {soil_moisture_9_27cm*100:.0f}% (soglia saturazione ≥{SOIL_MOISTURE_AUTUMN_HIGH*100:.0f}% per {AUTUMN_HIGH_MOISTURE_DAYS} giorni)\n"
advice_msg += "\n**Consiglio**: Continua il monitoraggio. Lo spegnimento è imminente."
# Soil status summary
soil_summary_parts = []
if soil_temp_6cm is not None:
soil_summary_parts.append(f"T.Suolo 6cm: {soil_temp_6cm:.1f}°C")
if soil_moisture_9_27cm is not None:
soil_summary_parts.append(f"Umidità 9-27cm: {soil_moisture_9_27cm*100:.0f}%")
if sunshine_hours is not None:
soil_summary_parts.append(f"Fotoperiodo: {sunshine_hours:.1f}h")
soil_status_summary = " | ".join(soil_summary_parts) if soil_summary_parts else "Dati limitati"
return {
"season_phase": "CLOSING",
"advice_level": advice_level,
"human_message": advice_msg,
"soil_status_summary": soil_status_summary,
"status_display": status
}
def generate_dormant_advice() -> Dict:
"""
FASE DORMIENTE (Inverno)
Returns: Dict con season_phase, advice_level, human_message, soil_status_summary
"""
status = "**Fase: Riposo Invernale**"
advice_msg = "❄️ **IMPIANTO SPENTO**\n"
advice_msg += "Stagione invernale. Le piante sono in riposo vegetativo completo.\n"
advice_msg += "**Consiglio**: L'impianto di irrigazione dovrebbe essere già svuotato e spento. "
advice_msg += "Nessuna irrigazione necessaria fino alla prossima primavera."
return {
"season_phase": "DORMANT",
"advice_level": "NO_ACTION",
"human_message": advice_msg,
"soil_status_summary": "Dormienza invernale",
"status_display": status
}
# =============================================================================
# CHART (grafico giorni precedenti e successivi)
# =============================================================================
def _build_chart_data(
daily_history: List[Dict],
hourly: Dict,
daily: Dict,
times: List,
current_idx: int,
now: datetime.datetime,
) -> Tuple[List[str], List[Optional[float]], List[Optional[float]], List[Optional[float]], List[Optional[float]], List[Optional[float]]]:
"""Costruisce serie giornaliere: date, temp_6, moist_9_27, moist_27_81, et0, precip."""
dates: List[str] = []
temp_6: List[Optional[float]] = []
moist_9_27: List[Optional[float]] = []
moist_27_81: List[Optional[float]] = []
et0: List[Optional[float]] = []
precip: List[Optional[float]] = []
# Passato: da daily_history (max 7)
seen_dates: set = set()
for h in daily_history:
d = h.get("date", "")
if d:
seen_dates.add(d)
dates.append(d)
temp_6.append(h.get("temp_6"))
moist_9_27.append(h.get("moist_9_27"))
moist_27_81.append(h.get("moist_27_81"))
et0.append(h.get("et0"))
precip.append(h.get("precip"))
# Futuro: da hourly (aggregato per giorno) e daily — salta date già presenti (evita duplicato "oggi")
daily_times = daily.get("time", []) or []
et0_sum = daily.get("et0_fao_evapotranspiration_sum", []) or []
precip_sum = daily.get("precipitation_sum", []) or []
soil_temp_6 = hourly.get("soil_temperature_6cm", []) or hourly.get("soil_temperature_0cm", []) or []
soil_moist_9_27 = hourly.get("soil_moisture_9_to_27cm", []) or []
soil_moist_27_81 = hourly.get("soil_moisture_27_to_81cm", []) or []
for day_offset in range(8):
start = current_idx + day_offset * 24
end = min(start + 24, len(times))
if start >= len(times):
break
try:
day_time = parse_time_to_local(times[start])
date_str = day_time.date().isoformat()
if date_str in seen_dates:
continue
seen_dates.add(date_str)
dates.append(date_str)
vals_t = [float(soil_temp_6[i]) for i in range(start, end) if i < len(soil_temp_6) and soil_temp_6[i] is not None]
vals_9 = [float(soil_moist_9_27[i]) for i in range(start, end) if i < len(soil_moist_9_27) and soil_moist_9_27[i] is not None]
vals_27 = [float(soil_moist_27_81[i]) for i in range(start, end) if i < len(soil_moist_27_81) and soil_moist_27_81[i] is not None]
temp_6.append(sum(vals_t) / len(vals_t) if vals_t else None)
moist_9_27.append(sum(vals_9) / len(vals_9) if vals_9 else None)
moist_27_81.append(sum(vals_27) / len(vals_27) if vals_27 else None)
et0_val = None
precip_val = None
for i, d in enumerate(daily_times):
if d and str(d).startswith(date_str[:10]):
et0_val = float(et0_sum[i]) if i < len(et0_sum) and et0_sum[i] is not None else None
precip_val = float(precip_sum[i]) if i < len(precip_sum) and precip_sum[i] is not None else None
break
et0.append(et0_val)
precip.append(precip_val)
except Exception:
continue
return dates, temp_6, moist_9_27, moist_27_81, et0, precip
def build_irrigation_chart_bytes(
daily_history: List[Dict],
hourly: Dict,
daily: Dict,
times: List,
current_idx: int,
now: datetime.datetime,
) -> Optional[bytes]:
"""Genera grafico PNG (temperatura e umidità suolo 9-27/27-81, ET0, precipitazioni). Ritorna bytes o None se matplotlib assente."""
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
except ImportError:
return None
dates, temp_6, moist_9_27, moist_27_81, et0_list, precip_list = _build_chart_data(
daily_history, hourly, daily, times, current_idx, now
)
if not dates:
return None
x = list(range(len(dates)))
labels = [d[8:10] + "/" + d[5:7] if len(d) >= 10 else d for d in dates]
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6), sharex=True, gridspec_kw={"height_ratios": [1.2, 1]})
# Subplot 1: T suolo 6cm + Umidità 9-27 e 27-81
ax1.set_ylabel("T° suolo (°C) / Umidità (%)")
t_vals = [float(t) if t is not None else float("nan") for t in temp_6]
ax1.plot(x, t_vals, "o-", color="C0", label="T suolo 6cm", markersize=4)
m9 = [float(m)*100 if m is not None else float("nan") for m in moist_9_27]
m27 = [float(m)*100 if m is not None else float("nan") for m in moist_27_81]
ax1.plot(x, m9, "s-", color="C2", label="Umidità 9-27cm", markersize=4)
ax1.plot(x, m27, "^-", color="C3", label="Umidità 27-81cm", markersize=4)
ax1.axhline(y=SOIL_MOISTURE_DEEP_STRESS * 100, color="gray", linestyle="--", alpha=0.7, label=f"Trigger {SOIL_MOISTURE_DEEP_STRESS*100:.0f}%")
ax1.axhline(y=SOIL_MOISTURE_WILTING_POINT * 100, color="brown", linestyle="--", alpha=0.7, label=f"Appassimento {SOIL_MOISTURE_WILTING_POINT*100:.0f}%")
# Linea verticale "oggi"
today_iso = now.date().isoformat()
now_idx = None
for i, d in enumerate(dates):
if d and str(d).startswith(today_iso[:10]):
now_idx = i
break
if now_idx is not None:
ax1.axvline(x=now_idx, color="red", linewidth=1, linestyle="-", alpha=0.9)
ax1.legend(loc="upper right", fontsize=7)
ax1.grid(True, alpha=0.3)
ax1.set_ylim(bottom=0)
# Subplot 2: ET0 e Precipitazioni
ax2.set_ylabel("ET₀ (mm) / Precip (mm)")
et0_vals = [float(e) if e is not None else 0.0 for e in et0_list]
precip_vals = [float(p) if p is not None else 0.0 for p in precip_list]
ax2.bar([i - 0.2 for i in x], et0_vals, 0.35, label="ET₀", color="C0", alpha=0.8)
ax2.bar([i + 0.2 for i in x], precip_vals, 0.35, label="Precip", color="C1", alpha=0.8)
if now_idx is not None:
ax2.axvline(x=now_idx, color="red", linewidth=1, linestyle="-", alpha=0.9)
ax2.legend(loc="upper right", fontsize=7)
ax2.grid(True, alpha=0.3)
ax2.set_ylim(bottom=0)
plt.xticks(x, labels, rotation=45, ha="right")
plt.tight_layout()
import io
buf = io.BytesIO()
plt.savefig(buf, format="png", dpi=100, bbox_inches="tight")
plt.close()
buf.seek(0)
return buf.read()
# =============================================================================
# MAIN ANALYSIS
# =============================================================================
IRRIGATION_NEED_DELTA_SIGNIFICANT = 25 # Variazione indice 0-100 per "cambio significativo"
DAYS_WEEKLY_SUMMER = 7 # Cadenza settimanale in fase attiva
def should_send_auto_report(
phase: str,
soil_temp_6cm: Optional[float],
state: Dict,
force_debug: bool = False,
context: Optional[Dict] = None
) -> Tuple[bool, str]:
"""
Determina se inviare un report automatico:
a) Una tantum: prima uscita da dormiente prevista nel mese.
b) Estate: cadenza settimanale o variazioni significative del fabbisogno.
c) Una tantum: cessata necessità irrigazione a fine stagione.
context: will_exit_dormant_in_forecast, current_month_iso (YYYY-MM),
irrigation_need_today, irrigation_need_next_days, days_since_last_report.
"""
if force_debug:
return True, "DEBUG MODE"
ctx = context or {}
now = now_local()
current_month_iso = now.strftime("%Y-%m")
will_exit = ctx.get("will_exit_dormant_in_forecast", False)
need_today = ctx.get("irrigation_need_today")
need_next = ctx.get("irrigation_need_next_days") or []
last_report = state.get("last_auto_report_date")
days_since_last = 999
if last_report:
try:
d = datetime.date.fromisoformat(last_report)
days_since_last = (now.date() - d).days
except Exception:
pass
if phase == "dormant":
if will_exit and state.get("wakeup_notified_for_month") != current_month_iso:
state["wakeup_notified_for_month"] = current_month_iso
state["last_auto_report_date"] = now.date().isoformat()
return True, "PRIMO_RISVEGLIO_MESE"
return False, "DORMANT_SILENT"
if phase == "wakeup":
if state.get("wakeup_notified_for_month") != current_month_iso:
state["wakeup_notified_for_month"] = current_month_iso
state["last_auto_report_date"] = now.date().isoformat()
return True, "PRIMO_RISVEGLIO_MESE"
return False, "WAKEUP_GIÀ_NOTIFICATO"
if phase == "active":
state["shutdown_confirmed"] = False
if days_since_last >= DAYS_WEEKLY_SUMMER:
state["last_auto_report_date"] = now.date().isoformat()
if need_today is not None:
state["last_irrigation_need"] = need_today
return True, "CADENZA_SETTIMANALE"
if need_next and need_today is not None:
need_min = min([need_today] + need_next)
need_max = max([need_today] + need_next)
if (need_max - need_min) >= IRRIGATION_NEED_DELTA_SIGNIFICANT:
state["last_auto_report_date"] = now.date().isoformat()
state["last_irrigation_need"] = need_today
return True, "VARIAZIONE_FABBISOGNO"
last_need = state.get("last_irrigation_need")
if last_need is not None and need_today is not None and abs(need_today - last_need) >= IRRIGATION_NEED_DELTA_SIGNIFICANT:
state["last_auto_report_date"] = now.date().isoformat()
state["last_irrigation_need"] = need_today
return True, "VARIAZIONE_FABBISOGNO"
return False, "ACTIVE_NESSUNA_VARIAZIONE"
if phase == "shutdown":
if state.get("shutdown_confirmed", False):
return False, "SHUTDOWN_GIÀ_NOTIFICATO"
state["shutdown_confirmed"] = True
state["last_auto_report_date"] = now.date().isoformat()
return True, "SHUTDOWN_CESSATA_NECESSITÀ"
return False, "UNKNOWN_PHASE"
def analyze_irrigation(
lat: float = DEFAULT_LAT,
lon: float = DEFAULT_LON,
location_name: str = DEFAULT_LOCATION_NAME,
timezone: str = TZ,
debug_mode: bool = False,
force_send: bool = False
) -> Tuple[str, bool]:
"""
Analisi principale e generazione report.
Returns: (report, should_send_auto, chart_bytes o None)
"""
LOGGER.info("=== Analisi Irrigazione per %s ===", location_name)
# Carica stato precedente
state = load_state()
# Recupera dati
data = fetch_soil_and_weather(lat, lon, timezone)
if not data:
LOGGER.warning("Fetch dati meteo/suolo fallito: nessun dato disponibile")
return ("❌ **ERRORE**\n\nImpossibile recuperare dati meteorologici. Riprova più tardi.", False, None)
hourly = data.get("hourly", {}) or {}
daily = data.get("daily", {}) or {}
# Estrai dati attuali (primi valori)
times = hourly.get("time", []) or []
if not times:
LOGGER.warning("Nessun dato temporale nelle risposte API")
return ("❌ **ERRORE**\n\nNessun dato temporale disponibile.", False, None)
now = now_local()
current_idx = 0
for i, t_str in enumerate(times):
try:
t = parse_time_to_local(t_str)
if t >= now:
current_idx = i
break
except Exception:
continue
# Dati suolo (ICON Seamless: tutti i layer; fallback ICON Italia: 0-1, 3-9, 9-27, 27-81)
soil_temp_0cm_list = hourly.get("soil_temperature_0cm", []) or []
soil_temp_6cm_list = hourly.get("soil_temperature_6cm", []) or []
soil_temp_18cm_list = hourly.get("soil_temperature_18cm", []) or []
soil_temp_54cm_list = hourly.get("soil_temperature_54cm", []) or []
soil_moisture_0_1_list = hourly.get("soil_moisture_0_to_1cm", []) or []
soil_moisture_3_9_list = hourly.get("soil_moisture_3_to_9cm", []) or []
soil_moisture_9_27_list = hourly.get("soil_moisture_9_to_27cm", []) or []
soil_moisture_81_243_list = hourly.get("soil_moisture_81_to_243cm", []) or []
soil_moisture_27_81_list = hourly.get("soil_moisture_27_to_81cm", []) or []
precip_list = hourly.get("precipitation", []) or []
snowfall_list = hourly.get("snowfall", []) or []
et0_list = hourly.get("et0_fao_evapotranspiration", []) or []
vpd_list = hourly.get("vapour_pressure_deficit", []) or [] # Stress idrico
sunshine_list = hourly.get("sunshine_duration", []) or []
humidity_list = hourly.get("relative_humidity_2m", []) or [] # Umidità relativa aria
shortwave_rad_list = hourly.get("shortwave_radiation", []) or [] # GHI - Global Horizontal Irradiance
temp_2m_list = hourly.get("temperature_2m", []) or [] # Temperatura aria (per veto gelo)
def _at(idx, lst):
if lst and idx < len(lst) and lst[idx] is not None:
try:
return float(lst[idx])
except (TypeError, ValueError):
pass
return None
# Temperatura suolo: preferisci 6cm/18cm se presenti (ICON Seamless), altrimenti 0cm/54cm
soil_temp_6cm = _at(current_idx, soil_temp_6cm_list) or _at(current_idx, soil_temp_0cm_list)
soil_temp_18cm = _at(current_idx, soil_temp_18cm_list) or _at(current_idx, soil_temp_54cm_list)
# Umidità: layer 0-1, 3-9, 9-27, 27-81 (ICON Seamless o Italia)
soil_moisture_0_1cm = _at(current_idx, soil_moisture_0_1_list)
soil_moisture_3_9cm = _at(current_idx, soil_moisture_3_9_list) or soil_moisture_0_1cm
soil_moisture_9_27cm = _at(current_idx, soil_moisture_9_27_list) or _at(current_idx, soil_moisture_81_243_list)
soil_moisture_27_81cm = _at(current_idx, soil_moisture_27_81_list)
# Parametri aggiuntivi per calcolo stress idrico
vpd_avg = None # Vapour Pressure Deficit medio (24h)
vpd_values = []
for i in range(current_idx, min(current_idx + 24, len(vpd_list))):
if i < len(vpd_list) and vpd_list[i] is not None:
try:
vpd_values.append(float(vpd_list[i]))
except Exception:
continue
if vpd_values:
vpd_avg = sum(vpd_values) / len(vpd_values)
sunshine_hours = None # Ore di sole previste (24h)
sunshine_total = 0.0
for i in range(current_idx, min(current_idx + 24, len(sunshine_list))):
if i < len(sunshine_list) and sunshine_list[i] is not None:
try:
sunshine_total += float(sunshine_list[i])
except Exception:
continue
if sunshine_total > 0:
sunshine_hours = sunshine_total / 3600.0 # Converti secondi in ore
# Umidità relativa aria media (24h)
humidity_avg = None
humidity_values = []
for i in range(current_idx, min(current_idx + 24, len(humidity_list))):
if i < len(humidity_list) and humidity_list[i] is not None:
try:
humidity_values.append(float(humidity_list[i]))
except Exception:
continue
if humidity_values:
humidity_avg = sum(humidity_values) / len(humidity_values)
# Shortwave Radiation GHI media (24h) - energia per fotosintesi
shortwave_avg = None
shortwave_values = []
for i in range(current_idx, min(current_idx + 24, len(shortwave_rad_list))):
if i < len(shortwave_rad_list) and shortwave_rad_list[i] is not None:
try:
shortwave_values.append(float(shortwave_rad_list[i]))
except Exception:
continue
if shortwave_values:
shortwave_avg = sum(shortwave_values) / len(shortwave_values) # W/m²
# ET₀ medio (calcola su prossime 24h)
et0_avg = None
et0_values = []
for i in range(current_idx, min(current_idx + 24, len(et0_list))):
if i < len(et0_list) and et0_list[i] is not None:
try:
et0_values.append(float(et0_list[i]))
except Exception:
continue
if et0_values:
et0_avg = sum(et0_values) / len(et0_values)
# Previsioni pioggia
future_rain_total, rainy_days = check_future_rainfall(daily, days_ahead=5)
next_2_days_rain, _ = check_future_rainfall(daily, days_ahead=2)
# Temperatura aria min (prossime 24h) per veto gelo
air_temp_min_24h = None
for i in range(current_idx, min(current_idx + 24, len(temp_2m_list))):
if i < len(temp_2m_list) and temp_2m_list[i] is not None:
try:
v = float(temp_2m_list[i])
air_temp_min_24h = v if air_temp_min_24h is None else min(air_temp_min_24h, v)
except (TypeError, ValueError):
pass
freeze_veto = air_temp_min_24h is not None and air_temp_min_24h < AIR_TEMP_FREEZE_VETO
rain_veto = (
state.get("last_24h_precip_mm") is not None
and float(state["last_24h_precip_mm"]) >= PRECIP_VETO_MM_24H
)
# Modello a serbatoio (bucket): aggiorna bilancio una volta per giorno con ET0 e pioggia di oggi
today_iso = now.date().isoformat()
daily_times = daily.get("time", []) or []
et0_sum = daily.get("et0_fao_evapotranspiration_sum", []) or []
precip_sum = daily.get("precipitation_sum", []) or []
balance = float(state.get("water_balance_mm", WATER_BALANCE_MAX_MM))
last_balance_date = state.get("last_balance_date")
if last_balance_date != today_iso and daily_times:
day_idx = None
for i, d in enumerate(daily_times):
if d and str(d).startswith(today_iso[:10]):
day_idx = i
break
if day_idx is not None:
et0_day = float(et0_sum[day_idx]) if day_idx < len(et0_sum) and et0_sum[day_idx] is not None else 0.0
precip_day = float(precip_sum[day_idx]) if day_idx < len(precip_sum) and precip_sum[day_idx] is not None else 0.0
balance = balance - et0_day * KC_LAWN + precip_day
balance = max(0.0, min(WATER_BALANCE_MAX_MM, balance))
state["water_balance_mm"] = balance
state["last_balance_date"] = today_iso
suggested_minutes = None
if balance < WATER_BALANCE_CRITICAL_MM:
deficit_mm = WATER_BALANCE_MAX_MM - balance
suggested_minutes = int(round(deficit_mm * 60.0 / APPLIED_MM_PER_HOUR))
suggested_minutes = max(5, min(120, suggested_minutes)) # Clamp 5-120 min
# Storico giornaliero per grafico (ultimi 7 giorni): aggiungi oggi e tronca
day_et0 = None
day_precip = None
if daily_times and et0_sum and precip_sum:
for i, d in enumerate(daily_times):
if d and str(d).startswith(today_iso[:10]):
day_et0 = float(et0_sum[i]) if i < len(et0_sum) and et0_sum[i] is not None else None
day_precip = float(precip_sum[i]) if i < len(precip_sum) and precip_sum[i] is not None else None
break
today_snapshot = {
"date": today_iso,
"temp_6": soil_temp_6cm,
"moist_9_27": soil_moisture_9_27cm,
"moist_27_81": soil_moisture_27_81cm,
"et0": day_et0,
"precip": day_precip,
}
daily_history = list(state.get("daily_history", []) or [])
# Rimuovi eventuale entry duplicata per oggi
daily_history = [h for h in daily_history if h.get("date") != today_iso]
daily_history.append(today_snapshot)
daily_history = daily_history[-7:]
state["daily_history"] = daily_history
# Determina fase stagionale
month = now.month
phase = determine_seasonal_phase(
month, soil_temp_6cm, soil_moisture_3_9cm, soil_moisture_9_27cm, state
)
# Contesto per notifiche automatiche (uscita dormiente, fabbisogno, cadenza)
heart_moisture = None
if soil_moisture_3_9cm is not None and soil_moisture_9_27cm is not None:
heart_moisture = 0.4 * soil_moisture_3_9cm + 0.6 * soil_moisture_9_27cm
elif soil_moisture_9_27cm is not None:
heart_moisture = soil_moisture_9_27cm
elif soil_moisture_3_9cm is not None:
heart_moisture = soil_moisture_3_9cm
report_ctx = {
"will_exit_dormant_in_forecast": _will_exit_dormant_in_forecast(hourly, times, now),
"irrigation_need_today": _irrigation_need_index(heart_moisture, et0_avg, vpd_avg),
"irrigation_need_next_days": _irrigation_need_next_days(daily, heart_moisture, 3),
}
last_rep = state.get("last_auto_report_date")
if last_rep:
try:
report_ctx["days_since_last_report"] = (now.date() - datetime.date.fromisoformat(last_rep)).days
except Exception:
report_ctx["days_since_last_report"] = 999
else:
report_ctx["days_since_last_report"] = 999
# Determina se inviare report automatico
should_send, reason = should_send_auto_report(
phase, soil_temp_6cm, state, force_debug=debug_mode, context=report_ctx
)
# Aggiorna stato
state["phase"] = phase
state["last_check"] = now.isoformat()
# Aggiungi a storico (mantieni ultimi 7 giorni)
# Usa soil_temp_0cm per storico (mappato come 6cm nella logica)
today_str = now.date().isoformat()
state["soil_temp_history"] = [
(d, t) for d, t in state.get("soil_temp_history", [])
if (now.date() - datetime.date.fromisoformat(d)).days <= 7
]
if soil_temp_6cm is not None: # Questa è già mappata da soil_temp_0cm
state["soil_temp_history"].append((today_str, soil_temp_6cm))
# Aggiorna streak umidità alta
if (soil_moisture_9_27cm is not None and
soil_moisture_9_27cm >= SOIL_MOISTURE_AUTUMN_HIGH):
state["high_moisture_streak"] = state.get("high_moisture_streak", 0) + 1
else:
state["high_moisture_streak"] = 0
# Genera consiglio in base alla fase (restituisce Dict con JSON structure)
advice_dict = None
if phase == "wakeup":
advice_dict = generate_wakeup_advice(
soil_temp_6cm, soil_moisture_3_9cm, soil_moisture_9_27cm,
future_rain_total, rainy_days,
shortwave_avg=shortwave_avg,
sunshine_hours=sunshine_hours,
state=state
)
elif phase == "active":
advice_dict = generate_active_advice(
soil_moisture_0_1cm=soil_moisture_3_9cm, # Mappa 0-1cm → 3-9cm per retrocompatibilità
soil_moisture_3_9cm=soil_moisture_3_9cm,
soil_moisture_9_27cm=soil_moisture_9_27cm,
soil_moisture_27_81cm=soil_moisture_27_81cm, # None se non disponibile
future_rain_mm=future_rain_total,
rainy_days=rainy_days,
et0_avg=et0_avg,
next_2_days_rain=next_2_days_rain,
vpd_avg=vpd_avg
)
elif phase == "shutdown":
advice_dict = generate_shutdown_advice(
soil_temp_6cm, soil_moisture_9_27cm, state.get("high_moisture_streak", 0),
sunshine_hours=sunshine_hours,
shortwave_avg=shortwave_avg,
state=state
)
else: # dormant
advice_dict = generate_dormant_advice()
# Estrai status e advice dal dict per retrocompatibilità con report text
status = advice_dict.get("status_display", "**Fase: Sconosciuta**")
advice = advice_dict.get("human_message", "Analisi in corso...")
# Il dict contiene anche: season_phase, advice_level, soil_status_summary (per JSON output)
# Calcola trend per temperatura e umidità (ultimi 7 giorni dallo storico)
temp_trend = None
moisture_trend_3_9 = None
moisture_trend_9_27 = None
temp_history = state.get("soil_temp_history", [])
if len(temp_history) >= 2 and soil_temp_6cm is not None:
try:
# Confronta con valore di 7 giorni fa (se disponibile)
week_ago_date = (now.date() - datetime.timedelta(days=7)).isoformat()
old_temp = None
for date_str, temp_val in temp_history:
if date_str == week_ago_date:
old_temp = temp_val
break
if old_temp is not None:
diff = soil_temp_6cm - old_temp
if abs(diff) > 0.1:
temp_trend = f"{diff:+.1f}°C" if diff > 0 else f"{diff:.1f}°C"
except Exception:
pass
# Riga umidità 9-27 e 27-81 in evidenza (cuore e riserva); se mancano mostriamo "—"
moisture_summary_parts = []
moisture_summary_parts.append(f"9-27cm: {soil_moisture_9_27cm*100:.0f}%" if soil_moisture_9_27cm is not None else "9-27cm: —")
moisture_summary_parts.append(f"27-81cm: {soil_moisture_27_81cm*100:.0f}%" if soil_moisture_27_81cm is not None else "27-81cm: —")
moisture_summary_line = "💧 " + " | ".join(moisture_summary_parts) + "\n"
# Pianificazione prossimi 8 giorni (da hourly: medie giornaliere 9-27cm, prima necessità sotto trigger)
planning_8d_line = ""
if times and soil_moisture_9_27_list and len(soil_moisture_9_27_list) >= current_idx + 24:
day_avgs = []
first_below_trigger_day = None
for day_offset in range(8):
start = current_idx + day_offset * 24
end = min(start + 24, len(soil_moisture_9_27_list))
vals = []
for i in range(start, end):
if i < len(soil_moisture_9_27_list) and soil_moisture_9_27_list[i] is not None:
try:
vals.append(float(soil_moisture_9_27_list[i]))
except (TypeError, ValueError):
pass
if vals:
avg = sum(vals) / len(vals)
day_avgs.append((day_offset, avg))
if first_below_trigger_day is None and avg < SOIL_MOISTURE_DEEP_STRESS:
first_below_trigger_day = day_offset
if day_avgs:
parts = [f"G{d+1}:{a*100:.0f}%" for d, a in day_avgs[:8]]
planning_8d_line = "📅 Prossimi 8 gg (9-27cm): " + " ".join(parts)
if first_below_trigger_day is not None and phase == "active":
planning_8d_line += f" — sotto trigger (~{SOIL_MOISTURE_DEEP_STRESS*100:.0f}%) stimato tra {first_below_trigger_day + 1} gg"
planning_8d_line += "\n"
# Consigli orario e daytime lock (E, F)
timing_advice = []
if phase == "active":
timing_advice.append("Preferire irrigazione in mattinata (alba) se le notti sono calde (minor rischio fungino).")
if shortwave_avg is not None and shortwave_avg > SHORTWAVE_DAYTIME_LOCK_WM2:
timing_advice.append(f"Evitare irrigazione in pieno sole (radiazione >{SHORTWAVE_DAYTIME_LOCK_WM2:.0f} W/m²); preferire alba o tardo pomeriggio.")
# Veti in evidenza
veto_lines = []
if freeze_veto:
veto_lines.append(f"❄️ **VETO GELO**: T aria min 24h = {air_temp_min_24h:.1f}°C < {AIR_TEMP_FREEZE_VETO:.0f}°C — non irrigare.")
if rain_veto:
veto_lines.append(f"🌧️ **VETO PIOGGIA**: Ultime 24h ≥ {PRECIP_VETO_MM_24H:.0f} mm — non avviare irrigazione.")
# Colpo d'occhio (umidità e prossimi 8 gg sono nel grafico)
glance = [
status.strip(),
f"📍 {location_name} · {now.strftime('%d/%m/%Y %H:%M')}",
"",
]
if veto_lines:
glance.append("**Veti**")
glance.extend(veto_lines)
glance.append("")
if suggested_minutes is not None:
glance.append(f"**Irrigazione suggerita**: ~{suggested_minutes} min (ciclo lungo e lento, argilla)")
glance.append("")
# Costruisci report completo (strutturato)
report_parts = [
"\n".join(glance),
""*24,
"**Consiglio**",
advice,
"",
]
if timing_advice:
report_parts.append("**Orario** " + " · ".join(timing_advice))
# Salva stato
save_state(state)
# Grafico (giorni precedenti + successivi)
chart_bytes = None
try:
chart_bytes = build_irrigation_chart_bytes(
state.get("daily_history", []) or [],
hourly, daily, times, current_idx, now,
)
except Exception as e:
LOGGER.warning("Chart generation failed: %s", e)
report = "\n".join(report_parts)
LOGGER.info("Analisi completata. Fase: %s, Auto-send: %s (%s)", phase, should_send, reason)
return report, should_send if not force_send else True, chart_bytes
# =============================================================================
# TELEGRAM INTEGRATION (Optional)
# =============================================================================
def telegram_send_photo(photo_bytes: bytes, caption: str, chat_ids: Optional[List[str]] = None) -> bool:
"""Invia foto (PNG) a Telegram. caption in Markdown. Ritorna True se almeno un invio ok."""
token = load_bot_token()
if not token:
LOGGER.warning("Telegram token missing: photo not sent.")
return False
if chat_ids is None:
chat_ids = TELEGRAM_CHAT_IDS
url = "https://api.telegram.org/bot{}/sendPhoto".format(token)
sent_ok = False
import time
with requests.Session() as s:
for chat_id in chat_ids:
try:
r = s.post(
url,
data={"chat_id": chat_id, "caption": caption, "parse_mode": "Markdown"},
files={"photo": ("irrigazione.png", photo_bytes, "image/png")},
timeout=20,
)
if r.status_code == 200:
sent_ok = True
else:
LOGGER.error("Telegram sendPhoto chat_id=%s status=%s %s", chat_id, r.status_code, r.text[:200])
time.sleep(0.25)
except Exception as e:
LOGGER.exception("Telegram sendPhoto chat_id=%s err=%s", chat_id, e)
return sent_ok
TELEGRAM_MAX_MESSAGE_LENGTH = 4096 # Limite Telegram per messaggio
def _split_message_for_telegram(text: str, max_len: int = TELEGRAM_MAX_MESSAGE_LENGTH - 100) -> List[str]:
"""Spezza un messaggio in chunk sotto il limite Telegram (lascia margine per Markdown)."""
if len(text) <= max_len:
return [text] if text else []
chunks = []
while text:
if len(text) <= max_len:
chunks.append(text)
break
cut = text.rfind("\n", 0, max_len + 1)
if cut <= 0:
cut = max_len
chunks.append(text[:cut].strip())
text = text[cut:].lstrip()
return chunks
def telegram_send_markdown(message: str, chat_ids: Optional[List[str]] = None) -> bool:
"""Invia messaggio Telegram. Prova con Markdown; se Telegram risponde 400 (Markdown non valido), ritenta in testo piano."""
token = load_bot_token()
if not token:
LOGGER.warning("Telegram token missing: message not sent.")
return False
if chat_ids is None:
chat_ids = TELEGRAM_CHAT_IDS
chunks = _split_message_for_telegram(message)
if not chunks:
return True
url = f"https://api.telegram.org/bot{token}/sendMessage"
sent_ok = False
import time
with requests.Session() as s:
for chat_id in chat_ids:
for chunk in chunks:
payload = {
"chat_id": chat_id,
"text": chunk,
"parse_mode": "Markdown",
"disable_web_page_preview": True,
}
try:
resp = s.post(url, json=payload, timeout=15)
if resp.status_code == 200:
sent_ok = True
elif resp.status_code == 400:
# Markdown non valido (es. underscore in NO_ACTION): ritenta senza formattazione
payload_plain = {
"chat_id": chat_id,
"text": chunk,
"disable_web_page_preview": True,
}
resp2 = s.post(url, json=payload_plain, timeout=15)
if resp2.status_code == 200:
sent_ok = True
LOGGER.debug("Inviato come testo piano dopo 400 Markdown")
else:
LOGGER.error("Telegram error chat_id=%s anche senza Markdown status=%s %s",
chat_id, resp2.status_code, resp2.text[:300])
else:
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
chat_id, resp.status_code, resp.text[:500])
time.sleep(0.2)
except Exception as e:
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
time.sleep(0.25)
return sent_ok
# =============================================================================
# MAIN
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description="Smart Irrigation Advisor - Consulente Agronomico"
)
parser.add_argument("--lat", type=float, help="Latitudine (default: Casa)")
parser.add_argument("--lon", type=float, help="Longitudine (default: Casa)")
parser.add_argument("--location", help="Nome località (default: Casa)")
parser.add_argument("--timezone", help="Timezone IANA (default: Europe/Berlin)")
parser.add_argument("--telegram", action="store_true", help="Invia report via Telegram (solo se auto-reporting attivo o --force)")
parser.add_argument("--force", action="store_true", help="Forza invio anche se auto-reporting disabilitato")
parser.add_argument("--chat_id", help="Chat ID Telegram specifico (opzionale)")
parser.add_argument("--debug", action="store_true", help="Modalità debug (invia sempre e bypassa controlli)")
parser.add_argument("--auto", action="store_true", help="Modalità automatica (usa logica auto-reporting, invia via Telegram se attivo)")
args = parser.parse_args()
if args.debug:
global DEBUG
DEBUG = True
LOGGER.setLevel(logging.DEBUG)
lat = args.lat if args.lat is not None else DEFAULT_LAT
lon = args.lon if args.lon is not None else DEFAULT_LON
location = args.location if args.location else DEFAULT_LOCATION_NAME
timezone = args.timezone if args.timezone else TZ
run_mode = "auto" if args.auto else "manual"
LOGGER.info("Heartbeat: start mode=%s location=%s", run_mode, location)
if args.auto:
now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"{now_str} INFO Heartbeat auto run for {location}")
# Determina modalità operativa
force_send = args.force or args.debug
use_auto_logic = args.auto or (not args.telegram and not args.force)
# Genera report (e eventuale grafico)
report, should_send_auto, chart_bytes = analyze_irrigation(
lat, lon, location, timezone,
debug_mode=args.debug,
force_send=force_send
)
# Output
send_to_telegram = False
if args.auto:
# Modalità automatica (cron): usa logica auto-reporting
if should_send_auto:
send_to_telegram = True
LOGGER.info("Auto-reporting attivo: invio via Telegram")
else:
LOGGER.info("Auto-reporting disabilitato: report non inviato (fase: %s)",
load_state().get("phase", "unknown"))
# In modalità auto, se non inviamo, non stampiamo neanche
if not args.debug:
return
elif args.telegram:
# Modalità manuale (chiamata da Telegram): SEMPRE invia se --telegram è presente
# La logica auto-reporting si applica solo a cron (--auto)
send_to_telegram = True
if force_send:
LOGGER.info("Chiamata manuale da Telegram con --force: invio forzato")
elif should_send_auto:
LOGGER.info("Chiamata manuale da Telegram: invio (auto-reporting attivo)")
else:
LOGGER.info("Chiamata manuale da Telegram: invio (bypass auto-reporting)")
if send_to_telegram:
chat_ids = None
if args.chat_id:
chat_ids = [args.chat_id.strip()]
else:
chat_ids = TELEGRAM_CHAT_IDS
# Invia prima il report testuale (così arriva anche se dopo c'è timeout), poi il grafico
success = telegram_send_markdown(report, chat_ids=chat_ids)
if not success:
print(report) # Fallback su stdout
LOGGER.error("Errore invio Telegram, stampato su stdout")
if chart_bytes:
caption = f"💧 *Irrigazione* · {location} · {datetime.datetime.now().strftime('%d/%m/%Y %H:%M')}"
_ = telegram_send_photo(chart_bytes, caption, chat_ids)
else:
# Stampa sempre su stdout se non in modalità auto e non Telegram
print(report)
if __name__ == "__main__":
main()