Files

750 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import datetime
import json
import logging
import os
import time
from logging.handlers import RotatingFileHandler
from typing import Dict, List, Optional, Tuple
from zoneinfo import ZoneInfo
import requests
from dateutil import parser
from open_meteo_client import configure_open_meteo_session
# =============================================================================
# snow_radar.py
#
# Scopo:
# Analizza la nevicata in una griglia di località in un raggio di 40km da San Marino.
# Per ciascuna località mostra:
# - Nome della località
# - Somma dello snowfall orario nelle 12 ore precedenti
# - Somma dello snowfall previsto nelle 12 ore successive
# - Somma dello snowfall previsto nelle 24 ore successive
#
# Modello meteo:
# meteofrance_seamless (AROME) per dati dettagliati
#
# Token Telegram:
# Nessun token in chiaro. Lettura in ordine:
# 1) env TELEGRAM_BOT_TOKEN
# 2) ~/.telegram_dpc_bot_token
# 3) /etc/telegram_dpc_bot_token
#
# Debug:
# python3 snow_radar.py --debug
#
# Log:
# ./snow_radar.log (stessa cartella dello script)
# =============================================================================
DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
# ----------------- TELEGRAM -----------------
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# ----------------- CONFIGURAZIONE -----------------
# Elenco località da monitorare
LOCATIONS = [
{"name": "Casa (Strada Cà Toro)", "lat": 43.9356, "lon": 12.4296},
{"name": "Cerasolo", "lat": 43.9831, "lon": 12.5355}, # Frazione di San Marino, più a nord-est
{"name": "Rimini", "lat": 44.0678, "lon": 12.5695},
{"name": "Riccione", "lat": 44.0000, "lon": 12.6500},
{"name": "Cattolica", "lat": 43.9600, "lon": 12.7400},
{"name": "Pesaro", "lat": 43.9100, "lon": 12.9100},
{"name": "Morciano di Romagna", "lat": 43.9200, "lon": 12.6500},
{"name": "Sassocorvaro", "lat": 43.7800, "lon": 12.5000},
{"name": "Urbino", "lat": 43.7200, "lon": 12.6400},
{"name": "Frontino", "lat": 43.7600, "lon": 12.3800},
{"name": "Carpegna", "lat": 43.7819, "lon": 12.3346},
{"name": "Pennabilli", "lat": 43.8200, "lon": 12.2600},
{"name": "Miratoio", "lat": 43.8500, "lon": 12.3000}, # Approssimazione
{"name": "Sant'Agata Feltria", "lat": 43.8600, "lon": 12.2100},
{"name": "Novafeltria", "lat": 43.9000, "lon": 12.2900},
{"name": "Mercato Saraceno", "lat": 43.9500, "lon": 12.2000},
{"name": "Villa Verucchio", "lat": 44.0000, "lon": 12.4300},
{"name": "Santarcangelo di Romagna", "lat": 44.0600, "lon": 12.4500},
{"name": "Savignano sul Rubicone", "lat": 44.0900, "lon": 12.4000},
{"name": "Cesena", "lat": 44.1400, "lon": 12.2400},
{"name": "Bellaria-Igea Marina", "lat": 44.1400, "lon": 12.4800},
{"name": "Cervia", "lat": 44.2600, "lon": 12.3600},
{"name": "Roncofreddo", "lat": 44.0433, "lon": 12.3181},
{"name": "Torriana", "lat": 44.0400, "lon": 12.3800},
{"name": "Montescudo", "lat": 43.9167, "lon": 12.5333},
{"name": "Mercatino Conca", "lat": 43.8686, "lon": 12.4722},
{"name": "Macerata Feltria", "lat": 43.8033, "lon": 12.4418},
{"name": "Saludecio", "lat": 43.8750, "lon": 12.6667},
{"name": "Mondaino", "lat": 43.8500, "lon": 12.6833},
{"name": "Tavoleto", "lat": 43.8500, "lon": 12.6000},
]
# Timezone
TZ = "Europe/Berlin"
TZINFO = ZoneInfo(TZ)
# Modello meteo
MODEL_AROME = "meteofrance_seamless"
# File di log
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
LOG_FILE = os.path.join(BASE_DIR, "snow_radar.log")
# ----------------- OPEN-METEO -----------------
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
HTTP_HEADERS = {"User-Agent": "snow-radar/1.0"}
# ----------------- REVERSE GEOCODING -----------------
# Usa Nominatim (OpenStreetMap) per ottenere nomi località
NOMINATIM_URL = "https://nominatim.openstreetmap.org/reverse"
NOMINATIM_HEADERS = {"User-Agent": "snow-radar/1.0"}
def setup_logger() -> logging.Logger:
logger = logging.getLogger("snow_radar")
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
logger.handlers.clear()
fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
fh.setFormatter(fmt)
logger.addHandler(fh)
if DEBUG:
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(fmt)
logger.addHandler(sh)
return logger
LOGGER = setup_logger()
# =============================================================================
# Utility
# =============================================================================
def now_local() -> datetime.datetime:
return datetime.datetime.now(TZINFO)
def read_text_file(path: str) -> str:
try:
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except FileNotFoundError:
return ""
except PermissionError:
LOGGER.debug("Permission denied reading %s", path)
return ""
except Exception as e:
LOGGER.exception("Error reading %s: %s", path, e)
return ""
def load_bot_token() -> str:
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
if tok:
return tok
tok = read_text_file(TOKEN_FILE_HOME)
if tok:
return tok
tok = read_text_file(TOKEN_FILE_ETC)
return tok.strip() if tok else ""
def parse_time_to_local(t: str) -> datetime.datetime:
dt = parser.isoparse(t)
if dt.tzinfo is None:
return dt.replace(tzinfo=TZINFO)
return dt.astimezone(TZINFO)
# =============================================================================
# Geografia
# =============================================================================
def calculate_distance_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calcola distanza in km tra due punti geografici."""
from math import radians, cos, sin, asin, sqrt
# Formula di Haversine
R = 6371 # Raggio Terra in km
dlat = radians(lat2 - lat1)
dlon = radians(lon2 - lon1)
a = sin(dlat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
return R * c
# =============================================================================
# Open-Meteo
# =============================================================================
def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]:
"""
Recupera previsioni meteo per una località.
"""
params = {
"latitude": lat,
"longitude": lon,
"hourly": "snowfall,weathercode",
"timezone": TZ,
"forecast_days": 2,
"models": MODEL_AROME,
}
try:
r = session.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=(5, 25))
if r.status_code == 400:
try:
j = r.json()
LOGGER.warning("Open-Meteo 400 (lat=%.4f lon=%.4f): %s", lat, lon, j.get("reason", j))
except Exception:
LOGGER.warning("Open-Meteo 400 (lat=%.4f lon=%.4f): %s", lat, lon, r.text[:300])
return None
r.raise_for_status()
return r.json()
except Exception as e:
LOGGER.warning("Open-Meteo error (lat=%.4f lon=%.4f): %s", lat, lon, str(e))
return None
def analyze_snowfall_for_location(data: Dict, now: datetime.datetime) -> Optional[Dict]:
"""
Analizza snowfall per una località.
Nota: Open-Meteo fornisce principalmente dati futuri. Per i dati passati,
includiamo anche le ore appena passate se disponibili nei dati hourly.
Returns:
Dict con:
- snow_past_12h: somma snowfall ultime 12 ore (cm) - se disponibile nei dati
- snow_next_12h: somma snowfall prossime 12 ore (cm)
- snow_next_24h: somma snowfall prossime 24 ore (cm)
"""
hourly = data.get("hourly", {}) or {}
times = hourly.get("time", []) or []
snowfall = hourly.get("snowfall", []) or []
if not times or not snowfall:
return None
# Converti timestamps
dt_list = [parse_time_to_local(t) for t in times]
# Calcola finestre temporali
past_12h_start = now - datetime.timedelta(hours=12)
next_12h_end = now + datetime.timedelta(hours=12)
next_24h_end = now + datetime.timedelta(hours=24)
snow_past_12h = 0.0
snow_next_12h = 0.0
snow_next_24h = 0.0
for i, dt in enumerate(dt_list):
snow_val = float(snowfall[i]) if i < len(snowfall) and snowfall[i] is not None else 0.0
# Ultime 12 ore (passato) - solo se i dati includono il passato
if dt < now and dt >= past_12h_start:
snow_past_12h += snow_val
# Prossime 12 ore
if now <= dt < next_12h_end:
snow_next_12h += snow_val
# Prossime 24 ore
if now <= dt < next_24h_end:
snow_next_24h += snow_val
return {
"snow_past_12h": snow_past_12h,
"snow_next_12h": snow_next_12h,
"snow_next_24h": snow_next_24h,
}
# =============================================================================
# Mappa Grafica
# =============================================================================
def generate_snow_map(results: List[Dict], center_lat: float, center_lon: float, output_path: str,
data_field: str = "snow_next_24h", title_suffix: str = "") -> bool:
"""
Genera una mappa grafica con punti colorati in base all'accumulo di neve.
Args:
results: Lista di dict con 'name', 'lat', 'lon', 'snow_past_12h', 'snow_next_12h', 'snow_next_24h'
center_lat: Latitudine centro (San Marino)
center_lon: Longitudine centro (San Marino)
output_path: Percorso file output PNG
data_field: Campo da usare per i colori ('snow_past_12h' o 'snow_next_24h')
title_suffix: Suffisso da aggiungere al titolo
Returns:
True se generata con successo, False altrimenti
"""
try:
import matplotlib
matplotlib.use('Agg') # Backend senza GUI
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.colors import LinearSegmentedColormap
import numpy as np
except ImportError as e:
LOGGER.warning("matplotlib non disponibile: %s. Mappa non generata.", e)
return False
# Prova a importare contextily per mappa di sfondo
try:
import contextily as ctx
CONTEXTILY_AVAILABLE = True
except ImportError:
CONTEXTILY_AVAILABLE = False
LOGGER.warning("contextily non disponibile. Mappa generata senza sfondo geografico.")
if not results:
return False
# Estrai valori dal campo specificato
totals = [r.get(data_field, 0.0) for r in results]
max_total = max(totals) if totals else 1.0
min_total = min(totals) if totals else 0.0
# Estrai coordinate
lats = [r["lat"] for r in results]
lons = [r["lon"] for r in results]
names = [r["name"] for r in results]
# Crea figura
fig, ax = plt.subplots(figsize=(14, 12))
fig.patch.set_facecolor('white')
# Limiti fissi della mappa (più zoomata)
lat_min, lat_max = 43.7, 44.3
lon_min, lon_max = 12.1, 12.8
# Configura assi PRIMA di aggiungere lo sfondo
ax.set_xlim(lon_min, lon_max)
ax.set_ylim(lat_min, lat_max)
ax.set_aspect('equal', adjustable='box')
# Aggiungi mappa di sfondo OpenStreetMap se disponibile
if CONTEXTILY_AVAILABLE:
try:
# Aggiungi tile OpenStreetMap (contextily gestisce automaticamente la conversione)
ctx.add_basemap(ax, crs='EPSG:4326', source=ctx.providers.OpenStreetMap.Mapnik,
alpha=0.6, attribution_size=6)
LOGGER.debug("Mappa OpenStreetMap aggiunta come sfondo")
except Exception as e:
LOGGER.warning("Errore aggiunta mappa sfondo: %s. Continuo senza sfondo.", e)
# Non reimpostare CONTEXTILY_AVAILABLE qui, solo logga l'errore
# Disegna punti con colore basato su accumulo totale
# Colori: blu (poco) -> verde -> giallo -> arancione -> rosso (molto)
cmap = LinearSegmentedColormap.from_list('snow',
['#1E90FF', '#00CED1', '#32CD32', '#FFD700', '#FF8C00', '#FF4500', '#8B0000'])
scatter = ax.scatter(lons, lats, c=totals, s=250, cmap=cmap,
vmin=min_total, vmax=max_total,
edgecolors='black', linewidths=2, alpha=0.85, zorder=5)
# Posizionamento personalizzato per etichette specifiche
label_positions = {
"Casa (Strada Cà Toro)": (-20, 20), # Più alto e più a sx
"Cervia": (0, 20), # Più in alto
"Savignano sul Rubicone": (-15, 15), # Alto a sx
"Rimini": (0, 20), # Più in alto
"Santarcangelo di Romagna": (0, -20), # Più in basso
"Riccione": (0, -20), # Più in basso
"Morciano di Romagna": (0, -20), # Più in basso
"Miratoio": (0, -20), # Più in basso
"Carpegna": (-20, -25), # Più in basso e più a sx
"Pennabilli": (0, -20), # Più in basso
"Mercato Saraceno": (0, 20), # Più in alto
"Sant'Agata Feltria": (-20, 15), # Più a sx
"Villa Verucchio": (0, -25), # Più in basso
"Roncofreddo": (-15, 15), # Alto a sx
"Torriana": (-15, 15), # Alto a sx
"Cerasolo": (15, 0), # Più a dx
"Mercatino Conca": (0, -20), # Più in basso
"Novafeltria": (10, 0), # Leggermente più a dx
"Urbino": (0, 20), # Più in alto
"Saludecio": (15, -15), # Più in basso
"Macerata Feltria": (20, 0), # Più a dx
"Mondaino": (15, -15), # Basso a dx
"Tavoleto": (15, -15), # Basso a dx
}
# Offset di default per altre località
default_offsets = [
(8, 8), (8, -12), (-12, 8), (-12, -12), # 4 direzioni base
(0, 15), (0, -15), (15, 0), (-15, 0), # 4 direzioni intermedie
(10, 10), (-10, 10), (10, -10), (-10, -10) # Diagonali
]
for i, (lon, lat, name, total) in enumerate(zip(lons, lats, names, totals)):
# Usa posizionamento personalizzato se disponibile, altrimenti offset ciclico
if name in label_positions:
xytext = label_positions[name]
else:
offset_idx = i % len(default_offsets)
xytext = default_offsets[offset_idx]
# Font size basato su importanza
fontsize = 9 if total > 5 or name in ["Cerasolo", "Carpegna", "Rimini", "Pesaro"] else 8
# Salta Casa qui, la gestiamo separatamente
if name == "Casa (Strada Cà Toro)":
continue
ax.annotate(name, (lon, lat), xytext=xytext, textcoords='offset points',
fontsize=fontsize, fontweight='bold',
bbox=dict(boxstyle='round,pad=0.4', facecolor='white', alpha=0.9,
edgecolor='black', linewidth=1),
zorder=6)
# Aggiungi punto Casa (Strada Cà Toro) - più grande e visibile, etichetta solo "Casa"
casa_lat = 43.9356
casa_lon = 12.4296
casa_name = "Casa (Strada Cà Toro)"
casa_value = next((r.get(data_field, 0.0) for r in results if r.get("name") == casa_name), 0.0)
casa_color = cmap((casa_value - min_total) / (max_total - min_total) if max_total > min_total else 0.5)
ax.scatter([casa_lon], [casa_lat], s=350, c=[casa_color],
edgecolors='black', linewidths=2.5, zorder=7, marker='s') # Quadrato per Casa
ax.annotate('Casa', (casa_lon, casa_lat),
xytext=(-20, 20), textcoords='offset points', # Più alto e più a sx
fontsize=11, fontweight='bold',
bbox=dict(boxstyle='round,pad=0.6', facecolor='white', alpha=0.95,
edgecolor='black', linewidth=2),
zorder=8)
# Colorbar (spostata a destra) - label dinamica in base al campo
label_text = 'Accumulo Neve (cm)'
if data_field == "snow_past_12h":
label_text = 'Accumulo Neve Ultime 12h (cm)'
elif data_field == "snow_next_24h":
label_text = 'Accumulo Neve Prossime 24h (cm)'
cbar = plt.colorbar(scatter, ax=ax, label=label_text,
shrink=0.7, pad=0.02, location='right')
cbar.ax.set_ylabel(label_text, fontsize=11, fontweight='bold')
cbar.ax.tick_params(labelsize=9)
# Configura assi (etichette)
ax.set_xlabel('Longitudine (°E)', fontsize=12, fontweight='bold')
ax.set_ylabel('Latitudine (°N)', fontsize=12, fontweight='bold')
title = f'❄️ SNOW RADAR - Analisi Neve 30km da San Marino{title_suffix}'
ax.set_title(title, fontsize=15, fontweight='bold', pad=20)
# Griglia solo se non c'è mappa di sfondo
if not CONTEXTILY_AVAILABLE:
ax.grid(True, alpha=0.3, linestyle='--', zorder=1)
# Legenda spostata in basso a sinistra (non si sovrappone ai dati)
legend_elements = [
mpatches.Patch(facecolor='#1E90FF', label='0-1 cm'),
mpatches.Patch(facecolor='#32CD32', label='1-3 cm'),
mpatches.Patch(facecolor='#FFD700', label='3-5 cm'),
mpatches.Patch(facecolor='#FF8C00', label='5-10 cm'),
mpatches.Patch(facecolor='#FF4500', label='10-20 cm'),
mpatches.Patch(facecolor='#8B0000', label='>20 cm'),
]
ax.legend(handles=legend_elements, loc='lower left', fontsize=10,
framealpha=0.95, edgecolor='black', fancybox=True, shadow=True)
# Info timestamp spostata in alto a destra
now = now_local()
info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nLocalità con neve: {len(results)}"
ax.text(0.98, 0.98, info_text, transform=ax.transAxes,
fontsize=9, verticalalignment='top', horizontalalignment='right',
bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9,
edgecolor='gray', linewidth=1.5),
zorder=10)
plt.tight_layout()
# Salva
try:
plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
plt.close(fig)
LOGGER.info("Mappa salvata: %s", output_path)
return True
except Exception as e:
LOGGER.exception("Errore salvataggio mappa: %s", e)
plt.close(fig)
return False
def telegram_send_photo(photo_path: str, caption: str, chat_ids: Optional[List[str]] = None) -> bool:
"""
Invia foto via Telegram API.
Args:
photo_path: Percorso file immagine
caption: Didascalia foto (max 1024 caratteri)
chat_ids: Lista chat IDs (default: TELEGRAM_CHAT_IDS)
Returns:
True se inviata con successo, False altrimenti
"""
token = load_bot_token()
if not token:
LOGGER.warning("Telegram token missing: photo not sent.")
return False
if not os.path.exists(photo_path):
LOGGER.error("File foto non trovato: %s", photo_path)
return False
if chat_ids is None:
chat_ids = TELEGRAM_CHAT_IDS
url = f"https://api.telegram.org/bot{token}/sendPhoto"
# Limite Telegram per caption: 1024 caratteri
if len(caption) > 1024:
caption = caption[:1021] + "..."
sent_ok = False
with requests.Session() as s:
for chat_id in chat_ids:
try:
with open(photo_path, 'rb') as photo_file:
files = {'photo': photo_file}
data = {
'chat_id': chat_id,
'caption': caption,
'parse_mode': 'Markdown'
}
resp = s.post(url, files=files, data=data, timeout=30)
if resp.status_code == 200:
sent_ok = True
LOGGER.info("Foto inviata a chat_id=%s", chat_id)
else:
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
chat_id, resp.status_code, resp.text[:500])
time.sleep(0.5)
except Exception as e:
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
return sent_ok
# =============================================================================
# Telegram
# =============================================================================
def telegram_send_markdown(message_md: str, chat_ids: Optional[List[str]] = None) -> bool:
"""Invia messaggio Markdown su Telegram. Divide in più messaggi se troppo lungo."""
token = load_bot_token()
if not token:
LOGGER.warning("Telegram token missing: message not sent.")
return False
if chat_ids is None:
chat_ids = TELEGRAM_CHAT_IDS
# Telegram limite: 4096 caratteri per messaggio
MAX_MESSAGE_LENGTH = 4000 # Lascia margine per encoding
url = f"https://api.telegram.org/bot{token}/sendMessage"
# Se il messaggio è troppo lungo, dividilo
if len(message_md) <= MAX_MESSAGE_LENGTH:
messages = [message_md]
else:
# Dividi per righe, mantenendo l'header nel primo messaggio
lines = message_md.split('\n')
messages = []
current_msg = []
current_len = 0
# Header (prime righe fino a "*Riepilogo per località*")
header_lines = []
header_end_idx = 0
for i, line in enumerate(lines):
if "*Riepilogo per località" in line:
header_end_idx = i + 1
break
header_lines.append(line)
header = '\n'.join(header_lines)
header_len = len(header)
# Primo messaggio: header + prime località
current_msg = header_lines.copy()
current_len = header_len
for i in range(header_end_idx, len(lines)):
line = lines[i]
line_len = len(line) + 1 # +1 per \n
if current_len + line_len > MAX_MESSAGE_LENGTH:
# Chiudi messaggio corrente
messages.append('\n'.join(current_msg))
# Nuovo messaggio (solo continuazione)
current_msg = [line]
current_len = line_len
else:
current_msg.append(line)
current_len += line_len
# Aggiungi ultimo messaggio
if current_msg:
messages.append('\n'.join(current_msg))
sent_ok = False
with requests.Session() as s:
for chat_id in chat_ids:
for msg_idx, msg_text in enumerate(messages):
payload = {
"chat_id": chat_id,
"text": msg_text,
"parse_mode": "Markdown",
"disable_web_page_preview": True,
}
try:
resp = s.post(url, json=payload, timeout=15)
if resp.status_code == 200:
sent_ok = True
else:
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
chat_id, resp.status_code, resp.text[:500])
time.sleep(0.25)
except Exception as e:
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
return sent_ok
# =============================================================================
# Main
# =============================================================================
def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id: Optional[str] = None) -> None:
LOGGER.info("--- Snow Radar ---")
# Se chat_id è specificato, usa quello (per chiamate da Telegram)
if chat_id:
chat_ids = [chat_id]
elif debug_mode and not chat_ids:
# In debug mode, default al primo chat ID (admin)
chat_ids = [TELEGRAM_CHAT_IDS[0]]
now = now_local()
# Centro: San Marino (per calcolo distanze)
CENTER_LAT = 43.9356
CENTER_LON = 12.4296
# Analizza località predefinite
LOGGER.info("Analisi %d località predefinite...", len(LOCATIONS))
with requests.Session() as session:
configure_open_meteo_session(session, headers=HTTP_HEADERS)
results = []
for i, loc in enumerate(LOCATIONS):
# Calcola distanza da San Marino
distance_km = calculate_distance_km(CENTER_LAT, CENTER_LON, loc["lat"], loc["lon"])
LOGGER.debug("Analizzando località %d/%d: %s (%.2f km)", i+1, len(LOCATIONS), loc["name"], distance_km)
data = get_forecast(session, loc["lat"], loc["lon"])
if not data:
continue
snow_analysis = analyze_snowfall_for_location(data, now)
if not snow_analysis:
continue
# Aggiungi sempre Casa, anche se non c'è neve
# Per le altre località, aggiungi solo se c'è neve (passata o prevista)
is_casa = loc["name"] == "Casa (Strada Cà Toro)"
has_snow = (snow_analysis["snow_past_12h"] > 0.0 or
snow_analysis["snow_next_12h"] > 0.0 or
snow_analysis["snow_next_24h"] > 0.0)
if is_casa or has_snow:
results.append({
"name": loc["name"],
"lat": loc["lat"],
"lon": loc["lon"],
"distance_km": distance_km,
**snow_analysis
})
# Rate limiting per Open-Meteo
time.sleep(0.1)
if not results:
LOGGER.info("Nessuna neve rilevata nelle località monitorate")
if debug_mode:
message = "❄️ *SNOW RADAR*\n\nNessuna neve rilevata nelle località monitorate."
telegram_send_markdown(message, chat_ids=chat_ids)
return
# Genera e invia DUE mappe separate
now_str = now.strftime('%d/%m/%Y %H:%M')
# 1. Mappa snowfall passato (12h precedenti)
map_path_past = os.path.join(BASE_DIR, "snow_radar_past.png")
map_generated_past = generate_snow_map(results, CENTER_LAT, CENTER_LON, map_path_past,
data_field="snow_past_12h",
title_suffix=" - Ultime 12h")
if map_generated_past:
caption_past = (
f"❄️ *SNOW RADAR - Ultime 12h*\n"
f"📍 Centro: San Marino\n"
f"🕒 {now_str}\n"
f"📊 Località con neve: {len(results)}/{len(LOCATIONS)}"
)
telegram_send_photo(map_path_past, caption_past, chat_ids=chat_ids)
# Pulisci file temporaneo
try:
if os.path.exists(map_path_past):
os.remove(map_path_past)
except Exception:
pass
# 2. Mappa snowfall futuro (24h successive)
map_path_future = os.path.join(BASE_DIR, "snow_radar_future.png")
map_generated_future = generate_snow_map(results, CENTER_LAT, CENTER_LON, map_path_future,
data_field="snow_next_24h",
title_suffix=" - Prossime 24h")
if map_generated_future:
caption_future = (
f"❄️ *SNOW RADAR - Prossime 24h*\n"
f"📍 Centro: San Marino\n"
f"🕒 {now_str}\n"
f"📊 Località con neve: {len(results)}/{len(LOCATIONS)}"
)
telegram_send_photo(map_path_future, caption_future, chat_ids=chat_ids)
# Pulisci file temporaneo
try:
if os.path.exists(map_path_future):
os.remove(map_path_future)
except Exception:
pass
if map_generated_past or map_generated_future:
LOGGER.info("Mappe inviate con successo (%d località, passato: %s, futuro: %s)",
len(results), "" if map_generated_past else "no",
"" if map_generated_future else "no")
else:
LOGGER.error("Errore generazione mappe")
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(description="Snow Radar - Analisi neve in griglia 30km")
arg_parser.add_argument("--debug", action="store_true", help="Invia messaggi solo all'admin (chat ID: %s)" % TELEGRAM_CHAT_IDS[0])
arg_parser.add_argument("--chat_id", type=str, help="Chat ID specifico per invio messaggio (override debug mode)")
args = arg_parser.parse_args()
# Se --chat_id è specificato, usa quello; altrimenti usa logica debug
chat_id = args.chat_id if args.chat_id else None
chat_ids = None if chat_id else ([TELEGRAM_CHAT_IDS[0]] if args.debug else None)
main(chat_ids=chat_ids, debug_mode=args.debug, chat_id=chat_id)