#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import datetime import json import logging import os import time from logging.handlers import RotatingFileHandler from typing import Dict, List, Optional, Tuple from zoneinfo import ZoneInfo import requests from dateutil import parser from open_meteo_client import configure_open_meteo_session # ============================================================================= # snow_radar.py # # Scopo: # Analizza la neve in una griglia di località in un raggio di 40km da San Marino. # Combina due parametri Open-Meteo: # - snowfall: precipitazione nevosa (cm/h) - neve che cade # - snow_depth: spessore manto al suolo (m) - neve accumulata, incluso residuo # senza precipitazione (es. giorni successivi a nevicata) # # Modello meteo: # italia_meteo_arpae_icon_2i (supporta snowfall e snow_depth) # # Token Telegram: # Nessun token in chiaro. Lettura in ordine: # 1) env TELEGRAM_BOT_TOKEN # 2) ~/.telegram_dpc_bot_token # 3) /etc/telegram_dpc_bot_token # # Debug: # python3 snow_radar.py --debug # # Log: # ./snow_radar.log (stessa cartella dello script) # ============================================================================= DEBUG = os.environ.get("DEBUG", "0").strip() == "1" # ----------------- TELEGRAM ----------------- TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"] TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" # ----------------- CONFIGURAZIONE ----------------- # Elenco località da monitorare LOCATIONS = [ {"name": "Casa (Strada Cà Toro)", "lat": 43.9356, "lon": 12.4296}, {"name": "Cerasolo", "lat": 43.9831, "lon": 12.5355}, # Frazione di San Marino, più a nord-est {"name": "Rimini", "lat": 44.0678, "lon": 12.5695}, {"name": "Riccione", "lat": 44.0000, "lon": 12.6500}, {"name": "Cattolica", "lat": 43.9600, "lon": 12.7400}, {"name": "Pesaro", "lat": 43.9100, "lon": 12.9100}, {"name": "Morciano di Romagna", "lat": 43.9200, "lon": 12.6500}, {"name": "Sassocorvaro", "lat": 43.7800, "lon": 12.5000}, {"name": "Urbino", "lat": 43.7200, "lon": 12.6400}, {"name": "Frontino", "lat": 43.7600, "lon": 12.3800}, {"name": "Carpegna", "lat": 43.7819, "lon": 12.3346}, {"name": "Pennabilli", "lat": 43.8200, "lon": 12.2600}, {"name": "Miratoio", "lat": 43.8500, "lon": 12.3000}, # Approssimazione {"name": "Sant'Agata Feltria", "lat": 43.8600, "lon": 12.2100}, {"name": "Novafeltria", "lat": 43.9000, "lon": 12.2900}, {"name": "Mercato Saraceno", "lat": 43.9500, "lon": 12.2000}, {"name": "Villa Verucchio", "lat": 44.0000, "lon": 12.4300}, {"name": "Santarcangelo di Romagna", "lat": 44.0600, "lon": 12.4500}, {"name": "Savignano sul Rubicone", "lat": 44.0900, "lon": 12.4000}, {"name": "Cesena", "lat": 44.1400, "lon": 12.2400}, {"name": "Bellaria-Igea Marina", "lat": 44.1400, "lon": 12.4800}, {"name": "Cervia", "lat": 44.2600, "lon": 12.3600}, {"name": "Roncofreddo", "lat": 44.0433, "lon": 12.3181}, {"name": "Torriana", "lat": 44.0400, "lon": 12.3800}, {"name": "Montescudo", "lat": 43.9167, "lon": 12.5333}, {"name": "Mercatino Conca", "lat": 43.8686, "lon": 12.4722}, {"name": "Macerata Feltria", "lat": 43.8033, "lon": 12.4418}, {"name": "Saludecio", "lat": 43.8750, "lon": 12.6667}, {"name": "Mondaino", "lat": 43.8500, "lon": 12.6833}, {"name": "Tavoleto", "lat": 43.8500, "lon": 12.6000}, ] # Timezone TZ = "Europe/Berlin" TZINFO = ZoneInfo(TZ) # Modello meteo: italia_meteo_arpae_icon_2i supporta snowfall e snow_depth # - snowfall: precipitazione nevosa (cm/h) # - snow_depth: spessore manto al suolo (m), include neve residua anche senza precipitazione MODEL_SNOW = "italia_meteo_arpae_icon_2i" # Soglia minima (cm) per considerare "neve presente" - evita falsi positivi da rumore/modello # Valori < 1 cm sono tracce trascurabili (dew, frost, errori numerici) - non neve reale SNOW_THRESHOLD_CM = 1.0 # File di log BASE_DIR = os.path.dirname(os.path.abspath(__file__)) LOG_FILE = os.path.join(BASE_DIR, "snow_radar.log") # ----------------- OPEN-METEO ----------------- OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast" HTTP_HEADERS = {"User-Agent": "snow-radar/1.0"} # ----------------- REVERSE GEOCODING ----------------- # Usa Nominatim (OpenStreetMap) per ottenere nomi località NOMINATIM_URL = "https://nominatim.openstreetmap.org/reverse" NOMINATIM_HEADERS = {"User-Agent": "snow-radar/1.0"} def setup_logger() -> logging.Logger: logger = logging.getLogger("snow_radar") logger.setLevel(logging.DEBUG if DEBUG else logging.INFO) logger.handlers.clear() fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8") fh.setLevel(logging.DEBUG) fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s") fh.setFormatter(fmt) logger.addHandler(fh) if DEBUG: sh = logging.StreamHandler() sh.setLevel(logging.DEBUG) sh.setFormatter(fmt) logger.addHandler(sh) return logger LOGGER = setup_logger() # ============================================================================= # Utility # ============================================================================= def now_local() -> datetime.datetime: return datetime.datetime.now(TZINFO) def read_text_file(path: str) -> str: try: with open(path, "r", encoding="utf-8") as f: return f.read().strip() except FileNotFoundError: return "" except PermissionError: LOGGER.debug("Permission denied reading %s", path) return "" except Exception as e: LOGGER.exception("Error reading %s: %s", path, e) return "" def load_bot_token() -> str: tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip() if tok: return tok tok = read_text_file(TOKEN_FILE_HOME) if tok: return tok tok = read_text_file(TOKEN_FILE_ETC) return tok.strip() if tok else "" def parse_time_to_local(t: str) -> datetime.datetime: dt = parser.isoparse(t) if dt.tzinfo is None: return dt.replace(tzinfo=TZINFO) return dt.astimezone(TZINFO) # ============================================================================= # Geografia # ============================================================================= def calculate_distance_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calcola distanza in km tra due punti geografici.""" from math import radians, cos, sin, asin, sqrt # Formula di Haversine R = 6371 # Raggio Terra in km dlat = radians(lat2 - lat1) dlon = radians(lon2 - lon1) a = sin(dlat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon/2)**2 c = 2 * asin(sqrt(a)) return R * c # ============================================================================= # Open-Meteo # ============================================================================= def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]: """ Recupera previsioni meteo per una località. Inclusi: snowfall (precipitazione nevosa cm/h), snow_depth (manto al suolo m). """ params = { "latitude": lat, "longitude": lon, "hourly": "snowfall,snow_depth,weathercode", "timezone": TZ, "past_days": 7, "forecast_days": 7, "models": MODEL_SNOW, } try: r = session.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=(5, 25)) if r.status_code == 400: try: j = r.json() LOGGER.warning("Open-Meteo 400 (lat=%.4f lon=%.4f): %s", lat, lon, j.get("reason", j)) except Exception: LOGGER.warning("Open-Meteo 400 (lat=%.4f lon=%.4f): %s", lat, lon, r.text[:300]) return None r.raise_for_status() return r.json() except Exception as e: LOGGER.warning("Open-Meteo error (lat=%.4f lon=%.4f): %s", lat, lon, str(e)) return None def analyze_snowfall_for_location(data: Dict, now: datetime.datetime) -> Optional[Dict]: """ Analizza snowfall e snow_depth per una località. Combina: - snowfall: precipitazione nevosa (cm/h) - neve che cade - snow_depth: spessore manto al suolo (m) - neve accumulata, incluso residuo senza precipitazione Returns: Dict con valori in cm: - snow_past_12h: max(somma snowfall ultime 12h, snow_depth attuale) - snow_next_12h: somma snowfall prossime 12h - snow_next_24h: max(somma snowfall prossime 24h, snow_depth max previsto) - snow_depth_now_cm: manto attuale al suolo (cm) """ hourly = data.get("hourly", {}) or {} times = hourly.get("time", []) or [] snowfall = hourly.get("snowfall", []) or [] snow_depth_raw = hourly.get("snow_depth", []) or [] # in metri (m) if not times: return None # Converti timestamps dt_list = [parse_time_to_local(t) for t in times] # Calcola finestre temporali past_12h_start = now - datetime.timedelta(hours=12) next_12h_end = now + datetime.timedelta(hours=12) next_24h_end = now + datetime.timedelta(hours=24) snowfall_past_12h = 0.0 snowfall_next_12h = 0.0 snowfall_next_24h = 0.0 snow_depth_now_m = 0.0 snow_depth_max_past_12h_m = 0.0 snow_depth_max_next_24h_m = 0.0 # Rumore numerico: valori < 0.01 cm (snowfall) o < 0.0001 m (snow_depth) → 0 NOISE_FLOOR_SNOWFALL_CM = 0.01 NOISE_FLOOR_SNOW_DEPTH_M = 0.0001 for i, dt in enumerate(dt_list): snow_val = float(snowfall[i]) if i < len(snowfall) and snowfall[i] is not None else 0.0 depth_val = float(snow_depth_raw[i]) if i < len(snow_depth_raw) and snow_depth_raw[i] is not None else 0.0 if snow_val < NOISE_FLOOR_SNOWFALL_CM: snow_val = 0.0 if depth_val < NOISE_FLOOR_SNOW_DEPTH_M: depth_val = 0.0 # Snowfall nelle finestre temporali if dt < now and dt >= past_12h_start: snowfall_past_12h += snow_val snow_depth_max_past_12h_m = max(snow_depth_max_past_12h_m, depth_val) if now <= dt < next_12h_end: snowfall_next_12h += snow_val if now <= dt < next_24h_end: snowfall_next_24h += snow_val snow_depth_max_next_24h_m = max(snow_depth_max_next_24h_m, depth_val) # snow_depth attuale: usa valore più vicino a "now" (ultima ora passata o prima futura) if dt <= now: snow_depth_now_m = depth_val # snow_depth da m a cm snow_depth_now_cm = snow_depth_now_m * 100.0 snow_depth_max_past_12h_cm = snow_depth_max_past_12h_m * 100.0 snow_depth_max_next_24h_cm = snow_depth_max_next_24h_m * 100.0 # Combina precipitazione + manto: per passato usa max(somma precipitazione, manto attuale) # per futuro usa max(somma precipitazione, manto max previsto) snow_past_12h = max(snowfall_past_12h, snow_depth_now_cm) snow_next_24h = max(snowfall_next_24h, snow_depth_max_next_24h_cm) return { "snow_past_12h": snow_past_12h, "snow_next_12h": snowfall_next_12h, "snow_next_24h": snow_next_24h, "snow_depth_now_cm": snow_depth_now_cm, } # ============================================================================= # Mappa Grafica # ============================================================================= def generate_snow_map(results: List[Dict], center_lat: float, center_lon: float, output_path: str, data_field: str = "snow_next_24h", title_suffix: str = "") -> bool: """ Genera una mappa grafica con punti colorati in base all'accumulo di neve. Args: results: Lista di dict con 'name', 'lat', 'lon', 'snow_past_12h', 'snow_next_12h', 'snow_next_24h' center_lat: Latitudine centro (San Marino) center_lon: Longitudine centro (San Marino) output_path: Percorso file output PNG data_field: Campo da usare per i colori ('snow_past_12h' o 'snow_next_24h') title_suffix: Suffisso da aggiungere al titolo Returns: True se generata con successo, False altrimenti """ try: import matplotlib matplotlib.use('Agg') # Backend senza GUI import matplotlib.pyplot as plt import matplotlib.patches as mpatches from matplotlib.colors import LinearSegmentedColormap import numpy as np except ImportError as e: LOGGER.warning("matplotlib non disponibile: %s. Mappa non generata.", e) return False # Prova a importare contextily per mappa di sfondo try: import contextily as ctx CONTEXTILY_AVAILABLE = True except ImportError: CONTEXTILY_AVAILABLE = False LOGGER.warning("contextily non disponibile. Mappa generata senza sfondo geografico.") if not results: return False # Estrai valori dal campo specificato totals = [r.get(data_field, 0.0) for r in results] max_total = max(totals) if totals else 1.0 min_total = min(totals) if totals else 0.0 # Evita vmin==vmax (divisione per zero nel colormap) - tutti 0 → scala 0..1 if max_total <= min_total: max_total = max(min_total + 0.1, 1.0) # Estrai coordinate lats = [r["lat"] for r in results] lons = [r["lon"] for r in results] names = [r["name"] for r in results] # Crea figura fig, ax = plt.subplots(figsize=(14, 12)) fig.patch.set_facecolor('white') # Limiti fissi della mappa (più zoomata) lat_min, lat_max = 43.7, 44.3 lon_min, lon_max = 12.1, 12.8 # Configura assi PRIMA di aggiungere lo sfondo ax.set_xlim(lon_min, lon_max) ax.set_ylim(lat_min, lat_max) ax.set_aspect('equal', adjustable='box') # Aggiungi mappa di sfondo OpenStreetMap se disponibile if CONTEXTILY_AVAILABLE: try: # Aggiungi tile OpenStreetMap (contextily gestisce automaticamente la conversione) ctx.add_basemap(ax, crs='EPSG:4326', source=ctx.providers.OpenStreetMap.Mapnik, alpha=0.6, attribution_size=6) LOGGER.debug("Mappa OpenStreetMap aggiunta come sfondo") except Exception as e: LOGGER.warning("Errore aggiunta mappa sfondo: %s. Continuo senza sfondo.", e) # Non reimpostare CONTEXTILY_AVAILABLE qui, solo logga l'errore # Disegna punti con colore basato su accumulo totale # Colori: blu (poco) -> verde -> giallo -> arancione -> rosso (molto) cmap = LinearSegmentedColormap.from_list('snow', ['#1E90FF', '#00CED1', '#32CD32', '#FFD700', '#FF8C00', '#FF4500', '#8B0000']) scatter = ax.scatter(lons, lats, c=totals, s=250, cmap=cmap, vmin=min_total, vmax=max_total, edgecolors='black', linewidths=2, alpha=0.85, zorder=5) # Posizionamento personalizzato per etichette specifiche label_positions = { "Casa (Strada Cà Toro)": (-20, 20), # Più alto e più a sx "Cervia": (0, 20), # Più in alto "Savignano sul Rubicone": (-15, 15), # Alto a sx "Rimini": (0, 20), # Più in alto "Santarcangelo di Romagna": (0, -20), # Più in basso "Riccione": (0, -20), # Più in basso "Morciano di Romagna": (0, -20), # Più in basso "Miratoio": (0, -20), # Più in basso "Carpegna": (-20, -25), # Più in basso e più a sx "Pennabilli": (0, -20), # Più in basso "Mercato Saraceno": (0, 20), # Più in alto "Sant'Agata Feltria": (-20, 15), # Più a sx "Villa Verucchio": (0, -25), # Più in basso "Roncofreddo": (-15, 15), # Alto a sx "Torriana": (-15, 15), # Alto a sx "Cerasolo": (15, 0), # Più a dx "Mercatino Conca": (0, -20), # Più in basso "Novafeltria": (10, 0), # Leggermente più a dx "Urbino": (0, 20), # Più in alto "Saludecio": (15, -15), # Più in basso "Macerata Feltria": (20, 0), # Più a dx "Mondaino": (15, -15), # Basso a dx "Tavoleto": (15, -15), # Basso a dx } # Offset di default per altre località default_offsets = [ (8, 8), (8, -12), (-12, 8), (-12, -12), # 4 direzioni base (0, 15), (0, -15), (15, 0), (-15, 0), # 4 direzioni intermedie (10, 10), (-10, 10), (10, -10), (-10, -10) # Diagonali ] for i, (lon, lat, name, total) in enumerate(zip(lons, lats, names, totals)): # Usa posizionamento personalizzato se disponibile, altrimenti offset ciclico if name in label_positions: xytext = label_positions[name] else: offset_idx = i % len(default_offsets) xytext = default_offsets[offset_idx] # Font size basato su importanza fontsize = 9 if total > 5 or name in ["Cerasolo", "Carpegna", "Rimini", "Pesaro"] else 8 # Salta Casa qui, la gestiamo separatamente if name == "Casa (Strada Cà Toro)": continue ax.annotate(name, (lon, lat), xytext=xytext, textcoords='offset points', fontsize=fontsize, fontweight='bold', bbox=dict(boxstyle='round,pad=0.4', facecolor='white', alpha=0.9, edgecolor='black', linewidth=1), zorder=6) # Aggiungi punto Casa (Strada Cà Toro) - più grande e visibile, etichetta solo "Casa" casa_lat = 43.9356 casa_lon = 12.4296 casa_name = "Casa (Strada Cà Toro)" casa_value = next((r.get(data_field, 0.0) for r in results if r.get("name") == casa_name), 0.0) casa_color = cmap((casa_value - min_total) / (max_total - min_total) if max_total > min_total else 0.5) ax.scatter([casa_lon], [casa_lat], s=350, c=[casa_color], edgecolors='black', linewidths=2.5, zorder=7, marker='s') # Quadrato per Casa ax.annotate('Casa', (casa_lon, casa_lat), xytext=(-20, 20), textcoords='offset points', # Più alto e più a sx fontsize=11, fontweight='bold', bbox=dict(boxstyle='round,pad=0.6', facecolor='white', alpha=0.95, edgecolor='black', linewidth=2), zorder=8) # Colorbar (spostata a destra) - label dinamica in base al campo label_text = 'Accumulo Neve (cm)' if data_field == "snow_past_12h": label_text = 'Accumulo Neve Ultime 12h (cm)' elif data_field == "snow_next_24h": label_text = 'Accumulo Neve Prossime 24h (cm)' cbar = plt.colorbar(scatter, ax=ax, label=label_text, shrink=0.7, pad=0.02, location='right') cbar.ax.set_ylabel(label_text, fontsize=11, fontweight='bold') cbar.ax.tick_params(labelsize=9) # Configura assi (etichette) ax.set_xlabel('Longitudine (°E)', fontsize=12, fontweight='bold') ax.set_ylabel('Latitudine (°N)', fontsize=12, fontweight='bold') title = f'❄️ SNOW RADAR - Analisi Neve 30km da San Marino{title_suffix}' ax.set_title(title, fontsize=15, fontweight='bold', pad=20) # Griglia solo se non c'è mappa di sfondo if not CONTEXTILY_AVAILABLE: ax.grid(True, alpha=0.3, linestyle='--', zorder=1) # Legenda spostata in basso a sinistra (non si sovrappone ai dati) legend_elements = [ mpatches.Patch(facecolor='#1E90FF', label='0-1 cm'), mpatches.Patch(facecolor='#32CD32', label='1-3 cm'), mpatches.Patch(facecolor='#FFD700', label='3-5 cm'), mpatches.Patch(facecolor='#FF8C00', label='5-10 cm'), mpatches.Patch(facecolor='#FF4500', label='10-20 cm'), mpatches.Patch(facecolor='#8B0000', label='>20 cm'), ] ax.legend(handles=legend_elements, loc='lower left', fontsize=10, framealpha=0.95, edgecolor='black', fancybox=True, shadow=True) # Info timestamp spostata in alto a destra (Località con neve = solo quelle con neve sopra soglia) now = now_local() num_with_snow = sum(1 for r in results if r.get("has_snow", False)) info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nLocalità con neve: {num_with_snow}" ax.text(0.98, 0.98, info_text, transform=ax.transAxes, fontsize=9, verticalalignment='top', horizontalalignment='right', bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9, edgecolor='gray', linewidth=1.5), zorder=10) plt.tight_layout() # Salva try: plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white') plt.close(fig) LOGGER.info("Mappa salvata: %s", output_path) return True except Exception as e: LOGGER.exception("Errore salvataggio mappa: %s", e) plt.close(fig) return False def telegram_send_photo(photo_path: str, caption: str, chat_ids: Optional[List[str]] = None) -> bool: """ Invia foto via Telegram API. Args: photo_path: Percorso file immagine caption: Didascalia foto (max 1024 caratteri) chat_ids: Lista chat IDs (default: TELEGRAM_CHAT_IDS) Returns: True se inviata con successo, False altrimenti """ token = load_bot_token() if not token: LOGGER.warning("Telegram token missing: photo not sent.") return False if not os.path.exists(photo_path): LOGGER.error("File foto non trovato: %s", photo_path) return False if chat_ids is None: chat_ids = TELEGRAM_CHAT_IDS url = f"https://api.telegram.org/bot{token}/sendPhoto" # Limite Telegram per caption: 1024 caratteri if len(caption) > 1024: caption = caption[:1021] + "..." sent_ok = False with requests.Session() as s: for chat_id in chat_ids: try: with open(photo_path, 'rb') as photo_file: files = {'photo': photo_file} data = { 'chat_id': chat_id, 'caption': caption, 'parse_mode': 'Markdown' } resp = s.post(url, files=files, data=data, timeout=30) if resp.status_code == 200: sent_ok = True LOGGER.info("Foto inviata a chat_id=%s", chat_id) else: LOGGER.error("Telegram error chat_id=%s status=%s body=%s", chat_id, resp.status_code, resp.text[:500]) time.sleep(0.5) except Exception as e: LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e) return sent_ok # ============================================================================= # Telegram # ============================================================================= def telegram_send_markdown(message_md: str, chat_ids: Optional[List[str]] = None) -> bool: """Invia messaggio Markdown su Telegram. Divide in più messaggi se troppo lungo.""" token = load_bot_token() if not token: LOGGER.warning("Telegram token missing: message not sent.") return False if chat_ids is None: chat_ids = TELEGRAM_CHAT_IDS # Telegram limite: 4096 caratteri per messaggio MAX_MESSAGE_LENGTH = 4000 # Lascia margine per encoding url = f"https://api.telegram.org/bot{token}/sendMessage" # Se il messaggio è troppo lungo, dividilo if len(message_md) <= MAX_MESSAGE_LENGTH: messages = [message_md] else: # Dividi per righe, mantenendo l'header nel primo messaggio lines = message_md.split('\n') messages = [] current_msg = [] current_len = 0 # Header (prime righe fino a "*Riepilogo per località*") header_lines = [] header_end_idx = 0 for i, line in enumerate(lines): if "*Riepilogo per località" in line: header_end_idx = i + 1 break header_lines.append(line) header = '\n'.join(header_lines) header_len = len(header) # Primo messaggio: header + prime località current_msg = header_lines.copy() current_len = header_len for i in range(header_end_idx, len(lines)): line = lines[i] line_len = len(line) + 1 # +1 per \n if current_len + line_len > MAX_MESSAGE_LENGTH: # Chiudi messaggio corrente messages.append('\n'.join(current_msg)) # Nuovo messaggio (solo continuazione) current_msg = [line] current_len = line_len else: current_msg.append(line) current_len += line_len # Aggiungi ultimo messaggio if current_msg: messages.append('\n'.join(current_msg)) sent_ok = False with requests.Session() as s: for chat_id in chat_ids: for msg_idx, msg_text in enumerate(messages): payload = { "chat_id": chat_id, "text": msg_text, "parse_mode": "Markdown", "disable_web_page_preview": True, } try: resp = s.post(url, json=payload, timeout=15) if resp.status_code == 200: sent_ok = True else: LOGGER.error("Telegram error chat_id=%s status=%s body=%s", chat_id, resp.status_code, resp.text[:500]) time.sleep(0.25) except Exception as e: LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e) return sent_ok # ============================================================================= # Main # ============================================================================= def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id: Optional[str] = None) -> None: LOGGER.info("--- Snow Radar ---") # Se chat_id è specificato, usa quello (per chiamate da Telegram) if chat_id: chat_ids = [chat_id] elif debug_mode and not chat_ids: # In debug mode, default al primo chat ID (admin) chat_ids = [TELEGRAM_CHAT_IDS[0]] now = now_local() # Centro: San Marino (per calcolo distanze) CENTER_LAT = 43.9356 CENTER_LON = 12.4296 # Analizza località predefinite LOGGER.info("Analisi %d località predefinite...", len(LOCATIONS)) with requests.Session() as session: configure_open_meteo_session(session, headers=HTTP_HEADERS) results = [] for i, loc in enumerate(LOCATIONS): # Calcola distanza da San Marino distance_km = calculate_distance_km(CENTER_LAT, CENTER_LON, loc["lat"], loc["lon"]) LOGGER.debug("Analizzando località %d/%d: %s (%.2f km)", i+1, len(LOCATIONS), loc["name"], distance_km) data = get_forecast(session, loc["lat"], loc["lon"]) if not data: continue snow_analysis = analyze_snowfall_for_location(data, now) if not snow_analysis: continue # Aggiungi sempre Casa, anche se non c'è neve # Per le altre località, aggiungi solo se c'è neve sopra soglia (precipitazione o manto residuo) is_casa = loc["name"] == "Casa (Strada Cà Toro)" has_snow = ( snow_analysis["snow_past_12h"] >= SNOW_THRESHOLD_CM or snow_analysis["snow_next_12h"] >= SNOW_THRESHOLD_CM or snow_analysis["snow_next_24h"] >= SNOW_THRESHOLD_CM ) if is_casa or has_snow: results.append({ "name": loc["name"], "lat": loc["lat"], "lon": loc["lon"], "distance_km": distance_km, "has_snow": has_snow, **snow_analysis }) # Rate limiting per Open-Meteo time.sleep(0.1) if not results: LOGGER.info("Nessuna neve rilevata nelle località monitorate") if debug_mode: message = "❄️ *SNOW RADAR*\n\nNessuna neve rilevata nelle località monitorate." telegram_send_markdown(message, chat_ids=chat_ids) return # Genera e invia DUE mappe separate now_str = now.strftime('%d/%m/%Y %H:%M') num_with_snow = sum(1 for r in results if r.get("has_snow", False)) # 1. Mappa snowfall passato (12h precedenti) map_path_past = os.path.join(BASE_DIR, "snow_radar_past.png") map_generated_past = generate_snow_map(results, CENTER_LAT, CENTER_LON, map_path_past, data_field="snow_past_12h", title_suffix=" - Ultime 12h") if map_generated_past: caption_past = ( f"❄️ *SNOW RADAR - Ultime 12h*\n" f"📍 Centro: San Marino\n" f"🕒 {now_str}\n" f"📊 Località con neve: {num_with_snow}/{len(LOCATIONS)}" ) telegram_send_photo(map_path_past, caption_past, chat_ids=chat_ids) # Pulisci file temporaneo try: if os.path.exists(map_path_past): os.remove(map_path_past) except Exception: pass # 2. Mappa snowfall futuro (24h successive) map_path_future = os.path.join(BASE_DIR, "snow_radar_future.png") map_generated_future = generate_snow_map(results, CENTER_LAT, CENTER_LON, map_path_future, data_field="snow_next_24h", title_suffix=" - Prossime 24h") if map_generated_future: caption_future = ( f"❄️ *SNOW RADAR - Prossime 24h*\n" f"📍 Centro: San Marino\n" f"🕒 {now_str}\n" f"📊 Località con neve: {num_with_snow}/{len(LOCATIONS)}" ) telegram_send_photo(map_path_future, caption_future, chat_ids=chat_ids) # Pulisci file temporaneo try: if os.path.exists(map_path_future): os.remove(map_path_future) except Exception: pass if map_generated_past or map_generated_future: LOGGER.info("Mappe inviate con successo (%d località, passato: %s, futuro: %s)", len(results), "sì" if map_generated_past else "no", "sì" if map_generated_future else "no") else: LOGGER.error("Errore generazione mappe") if __name__ == "__main__": arg_parser = argparse.ArgumentParser(description="Snow Radar - Analisi neve in griglia 30km") arg_parser.add_argument("--debug", action="store_true", help="Invia messaggi solo all'admin (chat ID: %s)" % TELEGRAM_CHAT_IDS[0]) arg_parser.add_argument("--chat_id", type=str, help="Chat ID specifico per invio messaggio (override debug mode)") args = arg_parser.parse_args() # Se --chat_id è specificato, usa quello; altrimenti usa logica debug chat_id = args.chat_id if args.chat_id else None chat_ids = None if chat_id else ([TELEGRAM_CHAT_IDS[0]] if args.debug else None) main(chat_ids=chat_ids, debug_mode=args.debug, chat_id=chat_id)