750 lines
29 KiB
Python
Executable File
750 lines
29 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from logging.handlers import RotatingFileHandler
|
|
from typing import Dict, List, Optional, Tuple
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import requests
|
|
from dateutil import parser
|
|
from open_meteo_client import configure_open_meteo_session
|
|
|
|
# =============================================================================
|
|
# snow_radar.py
|
|
#
|
|
# Scopo:
|
|
# Analizza la nevicata in una griglia di località in un raggio di 40km da San Marino.
|
|
# Per ciascuna località mostra:
|
|
# - Nome della località
|
|
# - Somma dello snowfall orario nelle 12 ore precedenti
|
|
# - Somma dello snowfall previsto nelle 12 ore successive
|
|
# - Somma dello snowfall previsto nelle 24 ore successive
|
|
#
|
|
# Modello meteo:
|
|
# meteofrance_seamless (AROME) per dati dettagliati
|
|
#
|
|
# Token Telegram:
|
|
# Nessun token in chiaro. Lettura in ordine:
|
|
# 1) env TELEGRAM_BOT_TOKEN
|
|
# 2) ~/.telegram_dpc_bot_token
|
|
# 3) /etc/telegram_dpc_bot_token
|
|
#
|
|
# Debug:
|
|
# python3 snow_radar.py --debug
|
|
#
|
|
# Log:
|
|
# ./snow_radar.log (stessa cartella dello script)
|
|
# =============================================================================
|
|
|
|
DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
|
|
|
|
# ----------------- TELEGRAM -----------------
|
|
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
|
|
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
|
|
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
|
|
|
|
# ----------------- CONFIGURAZIONE -----------------
|
|
# Elenco località da monitorare
|
|
LOCATIONS = [
|
|
{"name": "Casa (Strada Cà Toro)", "lat": 43.9356, "lon": 12.4296},
|
|
{"name": "Cerasolo", "lat": 43.9831, "lon": 12.5355}, # Frazione di San Marino, più a nord-est
|
|
{"name": "Rimini", "lat": 44.0678, "lon": 12.5695},
|
|
{"name": "Riccione", "lat": 44.0000, "lon": 12.6500},
|
|
{"name": "Cattolica", "lat": 43.9600, "lon": 12.7400},
|
|
{"name": "Pesaro", "lat": 43.9100, "lon": 12.9100},
|
|
{"name": "Morciano di Romagna", "lat": 43.9200, "lon": 12.6500},
|
|
{"name": "Sassocorvaro", "lat": 43.7800, "lon": 12.5000},
|
|
{"name": "Urbino", "lat": 43.7200, "lon": 12.6400},
|
|
{"name": "Frontino", "lat": 43.7600, "lon": 12.3800},
|
|
{"name": "Carpegna", "lat": 43.7819, "lon": 12.3346},
|
|
{"name": "Pennabilli", "lat": 43.8200, "lon": 12.2600},
|
|
{"name": "Miratoio", "lat": 43.8500, "lon": 12.3000}, # Approssimazione
|
|
{"name": "Sant'Agata Feltria", "lat": 43.8600, "lon": 12.2100},
|
|
{"name": "Novafeltria", "lat": 43.9000, "lon": 12.2900},
|
|
{"name": "Mercato Saraceno", "lat": 43.9500, "lon": 12.2000},
|
|
{"name": "Villa Verucchio", "lat": 44.0000, "lon": 12.4300},
|
|
{"name": "Santarcangelo di Romagna", "lat": 44.0600, "lon": 12.4500},
|
|
{"name": "Savignano sul Rubicone", "lat": 44.0900, "lon": 12.4000},
|
|
{"name": "Cesena", "lat": 44.1400, "lon": 12.2400},
|
|
{"name": "Bellaria-Igea Marina", "lat": 44.1400, "lon": 12.4800},
|
|
{"name": "Cervia", "lat": 44.2600, "lon": 12.3600},
|
|
{"name": "Roncofreddo", "lat": 44.0433, "lon": 12.3181},
|
|
{"name": "Torriana", "lat": 44.0400, "lon": 12.3800},
|
|
{"name": "Montescudo", "lat": 43.9167, "lon": 12.5333},
|
|
{"name": "Mercatino Conca", "lat": 43.8686, "lon": 12.4722},
|
|
{"name": "Macerata Feltria", "lat": 43.8033, "lon": 12.4418},
|
|
{"name": "Saludecio", "lat": 43.8750, "lon": 12.6667},
|
|
{"name": "Mondaino", "lat": 43.8500, "lon": 12.6833},
|
|
{"name": "Tavoleto", "lat": 43.8500, "lon": 12.6000},
|
|
]
|
|
|
|
# Timezone
|
|
TZ = "Europe/Berlin"
|
|
TZINFO = ZoneInfo(TZ)
|
|
|
|
# Modello meteo
|
|
MODEL_AROME = "meteofrance_seamless"
|
|
|
|
# File di log
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
LOG_FILE = os.path.join(BASE_DIR, "snow_radar.log")
|
|
|
|
# ----------------- OPEN-METEO -----------------
|
|
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
|
|
HTTP_HEADERS = {"User-Agent": "snow-radar/1.0"}
|
|
|
|
# ----------------- REVERSE GEOCODING -----------------
|
|
# Usa Nominatim (OpenStreetMap) per ottenere nomi località
|
|
NOMINATIM_URL = "https://nominatim.openstreetmap.org/reverse"
|
|
NOMINATIM_HEADERS = {"User-Agent": "snow-radar/1.0"}
|
|
|
|
|
|
def setup_logger() -> logging.Logger:
|
|
logger = logging.getLogger("snow_radar")
|
|
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
|
|
logger.handlers.clear()
|
|
|
|
fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
|
|
fh.setLevel(logging.DEBUG)
|
|
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
|
|
fh.setFormatter(fmt)
|
|
logger.addHandler(fh)
|
|
|
|
if DEBUG:
|
|
sh = logging.StreamHandler()
|
|
sh.setLevel(logging.DEBUG)
|
|
sh.setFormatter(fmt)
|
|
logger.addHandler(sh)
|
|
|
|
return logger
|
|
|
|
|
|
LOGGER = setup_logger()
|
|
|
|
|
|
# =============================================================================
|
|
# Utility
|
|
# =============================================================================
|
|
def now_local() -> datetime.datetime:
|
|
return datetime.datetime.now(TZINFO)
|
|
|
|
|
|
def read_text_file(path: str) -> str:
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return f.read().strip()
|
|
except FileNotFoundError:
|
|
return ""
|
|
except PermissionError:
|
|
LOGGER.debug("Permission denied reading %s", path)
|
|
return ""
|
|
except Exception as e:
|
|
LOGGER.exception("Error reading %s: %s", path, e)
|
|
return ""
|
|
|
|
|
|
def load_bot_token() -> str:
|
|
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
|
|
if tok:
|
|
return tok
|
|
tok = read_text_file(TOKEN_FILE_HOME)
|
|
if tok:
|
|
return tok
|
|
tok = read_text_file(TOKEN_FILE_ETC)
|
|
return tok.strip() if tok else ""
|
|
|
|
|
|
def parse_time_to_local(t: str) -> datetime.datetime:
|
|
dt = parser.isoparse(t)
|
|
if dt.tzinfo is None:
|
|
return dt.replace(tzinfo=TZINFO)
|
|
return dt.astimezone(TZINFO)
|
|
|
|
|
|
# =============================================================================
|
|
# Geografia
|
|
# =============================================================================
|
|
def calculate_distance_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""Calcola distanza in km tra due punti geografici."""
|
|
from math import radians, cos, sin, asin, sqrt
|
|
|
|
# Formula di Haversine
|
|
R = 6371 # Raggio Terra in km
|
|
dlat = radians(lat2 - lat1)
|
|
dlon = radians(lon2 - lon1)
|
|
a = sin(dlat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon/2)**2
|
|
c = 2 * asin(sqrt(a))
|
|
return R * c
|
|
|
|
|
|
# =============================================================================
|
|
# Open-Meteo
|
|
# =============================================================================
|
|
def get_forecast(session: requests.Session, lat: float, lon: float) -> Optional[Dict]:
|
|
"""
|
|
Recupera previsioni meteo per una località.
|
|
"""
|
|
params = {
|
|
"latitude": lat,
|
|
"longitude": lon,
|
|
"hourly": "snowfall,weathercode",
|
|
"timezone": TZ,
|
|
"forecast_days": 2,
|
|
"models": MODEL_AROME,
|
|
}
|
|
|
|
try:
|
|
r = session.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=(5, 25))
|
|
if r.status_code == 400:
|
|
try:
|
|
j = r.json()
|
|
LOGGER.warning("Open-Meteo 400 (lat=%.4f lon=%.4f): %s", lat, lon, j.get("reason", j))
|
|
except Exception:
|
|
LOGGER.warning("Open-Meteo 400 (lat=%.4f lon=%.4f): %s", lat, lon, r.text[:300])
|
|
return None
|
|
r.raise_for_status()
|
|
return r.json()
|
|
except Exception as e:
|
|
LOGGER.warning("Open-Meteo error (lat=%.4f lon=%.4f): %s", lat, lon, str(e))
|
|
return None
|
|
|
|
|
|
def analyze_snowfall_for_location(data: Dict, now: datetime.datetime) -> Optional[Dict]:
|
|
"""
|
|
Analizza snowfall per una località.
|
|
|
|
Nota: Open-Meteo fornisce principalmente dati futuri. Per i dati passati,
|
|
includiamo anche le ore appena passate se disponibili nei dati hourly.
|
|
|
|
Returns:
|
|
Dict con:
|
|
- snow_past_12h: somma snowfall ultime 12 ore (cm) - se disponibile nei dati
|
|
- snow_next_12h: somma snowfall prossime 12 ore (cm)
|
|
- snow_next_24h: somma snowfall prossime 24 ore (cm)
|
|
"""
|
|
hourly = data.get("hourly", {}) or {}
|
|
times = hourly.get("time", []) or []
|
|
snowfall = hourly.get("snowfall", []) or []
|
|
|
|
if not times or not snowfall:
|
|
return None
|
|
|
|
# Converti timestamps
|
|
dt_list = [parse_time_to_local(t) for t in times]
|
|
|
|
# Calcola finestre temporali
|
|
past_12h_start = now - datetime.timedelta(hours=12)
|
|
next_12h_end = now + datetime.timedelta(hours=12)
|
|
next_24h_end = now + datetime.timedelta(hours=24)
|
|
|
|
snow_past_12h = 0.0
|
|
snow_next_12h = 0.0
|
|
snow_next_24h = 0.0
|
|
|
|
for i, dt in enumerate(dt_list):
|
|
snow_val = float(snowfall[i]) if i < len(snowfall) and snowfall[i] is not None else 0.0
|
|
|
|
# Ultime 12 ore (passato) - solo se i dati includono il passato
|
|
if dt < now and dt >= past_12h_start:
|
|
snow_past_12h += snow_val
|
|
|
|
# Prossime 12 ore
|
|
if now <= dt < next_12h_end:
|
|
snow_next_12h += snow_val
|
|
|
|
# Prossime 24 ore
|
|
if now <= dt < next_24h_end:
|
|
snow_next_24h += snow_val
|
|
|
|
return {
|
|
"snow_past_12h": snow_past_12h,
|
|
"snow_next_12h": snow_next_12h,
|
|
"snow_next_24h": snow_next_24h,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Mappa Grafica
|
|
# =============================================================================
|
|
def generate_snow_map(results: List[Dict], center_lat: float, center_lon: float, output_path: str,
|
|
data_field: str = "snow_next_24h", title_suffix: str = "") -> bool:
|
|
"""
|
|
Genera una mappa grafica con punti colorati in base all'accumulo di neve.
|
|
|
|
Args:
|
|
results: Lista di dict con 'name', 'lat', 'lon', 'snow_past_12h', 'snow_next_12h', 'snow_next_24h'
|
|
center_lat: Latitudine centro (San Marino)
|
|
center_lon: Longitudine centro (San Marino)
|
|
output_path: Percorso file output PNG
|
|
data_field: Campo da usare per i colori ('snow_past_12h' o 'snow_next_24h')
|
|
title_suffix: Suffisso da aggiungere al titolo
|
|
|
|
Returns:
|
|
True se generata con successo, False altrimenti
|
|
"""
|
|
try:
|
|
import matplotlib
|
|
matplotlib.use('Agg') # Backend senza GUI
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.patches as mpatches
|
|
from matplotlib.colors import LinearSegmentedColormap
|
|
import numpy as np
|
|
except ImportError as e:
|
|
LOGGER.warning("matplotlib non disponibile: %s. Mappa non generata.", e)
|
|
return False
|
|
|
|
# Prova a importare contextily per mappa di sfondo
|
|
try:
|
|
import contextily as ctx
|
|
CONTEXTILY_AVAILABLE = True
|
|
except ImportError:
|
|
CONTEXTILY_AVAILABLE = False
|
|
LOGGER.warning("contextily non disponibile. Mappa generata senza sfondo geografico.")
|
|
|
|
if not results:
|
|
return False
|
|
|
|
# Estrai valori dal campo specificato
|
|
totals = [r.get(data_field, 0.0) for r in results]
|
|
max_total = max(totals) if totals else 1.0
|
|
min_total = min(totals) if totals else 0.0
|
|
|
|
# Estrai coordinate
|
|
lats = [r["lat"] for r in results]
|
|
lons = [r["lon"] for r in results]
|
|
names = [r["name"] for r in results]
|
|
|
|
# Crea figura
|
|
fig, ax = plt.subplots(figsize=(14, 12))
|
|
fig.patch.set_facecolor('white')
|
|
|
|
# Limiti fissi della mappa (più zoomata)
|
|
lat_min, lat_max = 43.7, 44.3
|
|
lon_min, lon_max = 12.1, 12.8
|
|
|
|
# Configura assi PRIMA di aggiungere lo sfondo
|
|
ax.set_xlim(lon_min, lon_max)
|
|
ax.set_ylim(lat_min, lat_max)
|
|
ax.set_aspect('equal', adjustable='box')
|
|
|
|
# Aggiungi mappa di sfondo OpenStreetMap se disponibile
|
|
if CONTEXTILY_AVAILABLE:
|
|
try:
|
|
# Aggiungi tile OpenStreetMap (contextily gestisce automaticamente la conversione)
|
|
ctx.add_basemap(ax, crs='EPSG:4326', source=ctx.providers.OpenStreetMap.Mapnik,
|
|
alpha=0.6, attribution_size=6)
|
|
LOGGER.debug("Mappa OpenStreetMap aggiunta come sfondo")
|
|
except Exception as e:
|
|
LOGGER.warning("Errore aggiunta mappa sfondo: %s. Continuo senza sfondo.", e)
|
|
# Non reimpostare CONTEXTILY_AVAILABLE qui, solo logga l'errore
|
|
|
|
# Disegna punti con colore basato su accumulo totale
|
|
# Colori: blu (poco) -> verde -> giallo -> arancione -> rosso (molto)
|
|
cmap = LinearSegmentedColormap.from_list('snow',
|
|
['#1E90FF', '#00CED1', '#32CD32', '#FFD700', '#FF8C00', '#FF4500', '#8B0000'])
|
|
|
|
scatter = ax.scatter(lons, lats, c=totals, s=250, cmap=cmap,
|
|
vmin=min_total, vmax=max_total,
|
|
edgecolors='black', linewidths=2, alpha=0.85, zorder=5)
|
|
|
|
# Posizionamento personalizzato per etichette specifiche
|
|
label_positions = {
|
|
"Casa (Strada Cà Toro)": (-20, 20), # Più alto e più a sx
|
|
"Cervia": (0, 20), # Più in alto
|
|
"Savignano sul Rubicone": (-15, 15), # Alto a sx
|
|
"Rimini": (0, 20), # Più in alto
|
|
"Santarcangelo di Romagna": (0, -20), # Più in basso
|
|
"Riccione": (0, -20), # Più in basso
|
|
"Morciano di Romagna": (0, -20), # Più in basso
|
|
"Miratoio": (0, -20), # Più in basso
|
|
"Carpegna": (-20, -25), # Più in basso e più a sx
|
|
"Pennabilli": (0, -20), # Più in basso
|
|
"Mercato Saraceno": (0, 20), # Più in alto
|
|
"Sant'Agata Feltria": (-20, 15), # Più a sx
|
|
"Villa Verucchio": (0, -25), # Più in basso
|
|
"Roncofreddo": (-15, 15), # Alto a sx
|
|
"Torriana": (-15, 15), # Alto a sx
|
|
"Cerasolo": (15, 0), # Più a dx
|
|
"Mercatino Conca": (0, -20), # Più in basso
|
|
"Novafeltria": (10, 0), # Leggermente più a dx
|
|
"Urbino": (0, 20), # Più in alto
|
|
"Saludecio": (15, -15), # Più in basso
|
|
"Macerata Feltria": (20, 0), # Più a dx
|
|
"Mondaino": (15, -15), # Basso a dx
|
|
"Tavoleto": (15, -15), # Basso a dx
|
|
}
|
|
|
|
# Offset di default per altre località
|
|
default_offsets = [
|
|
(8, 8), (8, -12), (-12, 8), (-12, -12), # 4 direzioni base
|
|
(0, 15), (0, -15), (15, 0), (-15, 0), # 4 direzioni intermedie
|
|
(10, 10), (-10, 10), (10, -10), (-10, -10) # Diagonali
|
|
]
|
|
|
|
for i, (lon, lat, name, total) in enumerate(zip(lons, lats, names, totals)):
|
|
# Usa posizionamento personalizzato se disponibile, altrimenti offset ciclico
|
|
if name in label_positions:
|
|
xytext = label_positions[name]
|
|
else:
|
|
offset_idx = i % len(default_offsets)
|
|
xytext = default_offsets[offset_idx]
|
|
|
|
# Font size basato su importanza
|
|
fontsize = 9 if total > 5 or name in ["Cerasolo", "Carpegna", "Rimini", "Pesaro"] else 8
|
|
|
|
# Salta Casa qui, la gestiamo separatamente
|
|
if name == "Casa (Strada Cà Toro)":
|
|
continue
|
|
|
|
ax.annotate(name, (lon, lat), xytext=xytext, textcoords='offset points',
|
|
fontsize=fontsize, fontweight='bold',
|
|
bbox=dict(boxstyle='round,pad=0.4', facecolor='white', alpha=0.9,
|
|
edgecolor='black', linewidth=1),
|
|
zorder=6)
|
|
|
|
# Aggiungi punto Casa (Strada Cà Toro) - più grande e visibile, etichetta solo "Casa"
|
|
casa_lat = 43.9356
|
|
casa_lon = 12.4296
|
|
casa_name = "Casa (Strada Cà Toro)"
|
|
casa_value = next((r.get(data_field, 0.0) for r in results if r.get("name") == casa_name), 0.0)
|
|
casa_color = cmap((casa_value - min_total) / (max_total - min_total) if max_total > min_total else 0.5)
|
|
ax.scatter([casa_lon], [casa_lat], s=350, c=[casa_color],
|
|
edgecolors='black', linewidths=2.5, zorder=7, marker='s') # Quadrato per Casa
|
|
ax.annotate('Casa', (casa_lon, casa_lat),
|
|
xytext=(-20, 20), textcoords='offset points', # Più alto e più a sx
|
|
fontsize=11, fontweight='bold',
|
|
bbox=dict(boxstyle='round,pad=0.6', facecolor='white', alpha=0.95,
|
|
edgecolor='black', linewidth=2),
|
|
zorder=8)
|
|
|
|
# Colorbar (spostata a destra) - label dinamica in base al campo
|
|
label_text = 'Accumulo Neve (cm)'
|
|
if data_field == "snow_past_12h":
|
|
label_text = 'Accumulo Neve Ultime 12h (cm)'
|
|
elif data_field == "snow_next_24h":
|
|
label_text = 'Accumulo Neve Prossime 24h (cm)'
|
|
|
|
cbar = plt.colorbar(scatter, ax=ax, label=label_text,
|
|
shrink=0.7, pad=0.02, location='right')
|
|
cbar.ax.set_ylabel(label_text, fontsize=11, fontweight='bold')
|
|
cbar.ax.tick_params(labelsize=9)
|
|
|
|
# Configura assi (etichette)
|
|
ax.set_xlabel('Longitudine (°E)', fontsize=12, fontweight='bold')
|
|
ax.set_ylabel('Latitudine (°N)', fontsize=12, fontweight='bold')
|
|
title = f'❄️ SNOW RADAR - Analisi Neve 30km da San Marino{title_suffix}'
|
|
ax.set_title(title, fontsize=15, fontweight='bold', pad=20)
|
|
|
|
# Griglia solo se non c'è mappa di sfondo
|
|
if not CONTEXTILY_AVAILABLE:
|
|
ax.grid(True, alpha=0.3, linestyle='--', zorder=1)
|
|
|
|
# Legenda spostata in basso a sinistra (non si sovrappone ai dati)
|
|
legend_elements = [
|
|
mpatches.Patch(facecolor='#1E90FF', label='0-1 cm'),
|
|
mpatches.Patch(facecolor='#32CD32', label='1-3 cm'),
|
|
mpatches.Patch(facecolor='#FFD700', label='3-5 cm'),
|
|
mpatches.Patch(facecolor='#FF8C00', label='5-10 cm'),
|
|
mpatches.Patch(facecolor='#FF4500', label='10-20 cm'),
|
|
mpatches.Patch(facecolor='#8B0000', label='>20 cm'),
|
|
]
|
|
ax.legend(handles=legend_elements, loc='lower left', fontsize=10,
|
|
framealpha=0.95, edgecolor='black', fancybox=True, shadow=True)
|
|
|
|
# Info timestamp spostata in alto a destra
|
|
now = now_local()
|
|
info_text = f"Aggiornamento: {now.strftime('%d/%m/%Y %H:%M')}\nLocalità con neve: {len(results)}"
|
|
ax.text(0.98, 0.98, info_text, transform=ax.transAxes,
|
|
fontsize=9, verticalalignment='top', horizontalalignment='right',
|
|
bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9,
|
|
edgecolor='gray', linewidth=1.5),
|
|
zorder=10)
|
|
|
|
plt.tight_layout()
|
|
|
|
# Salva
|
|
try:
|
|
plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
|
|
plt.close(fig)
|
|
LOGGER.info("Mappa salvata: %s", output_path)
|
|
return True
|
|
except Exception as e:
|
|
LOGGER.exception("Errore salvataggio mappa: %s", e)
|
|
plt.close(fig)
|
|
return False
|
|
|
|
|
|
def telegram_send_photo(photo_path: str, caption: str, chat_ids: Optional[List[str]] = None) -> bool:
|
|
"""
|
|
Invia foto via Telegram API.
|
|
|
|
Args:
|
|
photo_path: Percorso file immagine
|
|
caption: Didascalia foto (max 1024 caratteri)
|
|
chat_ids: Lista chat IDs (default: TELEGRAM_CHAT_IDS)
|
|
|
|
Returns:
|
|
True se inviata con successo, False altrimenti
|
|
"""
|
|
token = load_bot_token()
|
|
if not token:
|
|
LOGGER.warning("Telegram token missing: photo not sent.")
|
|
return False
|
|
|
|
if not os.path.exists(photo_path):
|
|
LOGGER.error("File foto non trovato: %s", photo_path)
|
|
return False
|
|
|
|
if chat_ids is None:
|
|
chat_ids = TELEGRAM_CHAT_IDS
|
|
|
|
url = f"https://api.telegram.org/bot{token}/sendPhoto"
|
|
|
|
# Limite Telegram per caption: 1024 caratteri
|
|
if len(caption) > 1024:
|
|
caption = caption[:1021] + "..."
|
|
|
|
sent_ok = False
|
|
with requests.Session() as s:
|
|
for chat_id in chat_ids:
|
|
try:
|
|
with open(photo_path, 'rb') as photo_file:
|
|
files = {'photo': photo_file}
|
|
data = {
|
|
'chat_id': chat_id,
|
|
'caption': caption,
|
|
'parse_mode': 'Markdown'
|
|
}
|
|
resp = s.post(url, files=files, data=data, timeout=30)
|
|
if resp.status_code == 200:
|
|
sent_ok = True
|
|
LOGGER.info("Foto inviata a chat_id=%s", chat_id)
|
|
else:
|
|
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
|
|
chat_id, resp.status_code, resp.text[:500])
|
|
time.sleep(0.5)
|
|
except Exception as e:
|
|
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
|
|
|
|
return sent_ok
|
|
|
|
|
|
# =============================================================================
|
|
# Telegram
|
|
# =============================================================================
|
|
def telegram_send_markdown(message_md: str, chat_ids: Optional[List[str]] = None) -> bool:
|
|
"""Invia messaggio Markdown su Telegram. Divide in più messaggi se troppo lungo."""
|
|
token = load_bot_token()
|
|
if not token:
|
|
LOGGER.warning("Telegram token missing: message not sent.")
|
|
return False
|
|
|
|
if chat_ids is None:
|
|
chat_ids = TELEGRAM_CHAT_IDS
|
|
|
|
# Telegram limite: 4096 caratteri per messaggio
|
|
MAX_MESSAGE_LENGTH = 4000 # Lascia margine per encoding
|
|
|
|
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
|
|
|
# Se il messaggio è troppo lungo, dividilo
|
|
if len(message_md) <= MAX_MESSAGE_LENGTH:
|
|
messages = [message_md]
|
|
else:
|
|
# Dividi per righe, mantenendo l'header nel primo messaggio
|
|
lines = message_md.split('\n')
|
|
messages = []
|
|
current_msg = []
|
|
current_len = 0
|
|
|
|
# Header (prime righe fino a "*Riepilogo per località*")
|
|
header_lines = []
|
|
header_end_idx = 0
|
|
for i, line in enumerate(lines):
|
|
if "*Riepilogo per località" in line:
|
|
header_end_idx = i + 1
|
|
break
|
|
header_lines.append(line)
|
|
|
|
header = '\n'.join(header_lines)
|
|
header_len = len(header)
|
|
|
|
# Primo messaggio: header + prime località
|
|
current_msg = header_lines.copy()
|
|
current_len = header_len
|
|
|
|
for i in range(header_end_idx, len(lines)):
|
|
line = lines[i]
|
|
line_len = len(line) + 1 # +1 per \n
|
|
|
|
if current_len + line_len > MAX_MESSAGE_LENGTH:
|
|
# Chiudi messaggio corrente
|
|
messages.append('\n'.join(current_msg))
|
|
# Nuovo messaggio (solo continuazione)
|
|
current_msg = [line]
|
|
current_len = line_len
|
|
else:
|
|
current_msg.append(line)
|
|
current_len += line_len
|
|
|
|
# Aggiungi ultimo messaggio
|
|
if current_msg:
|
|
messages.append('\n'.join(current_msg))
|
|
|
|
sent_ok = False
|
|
with requests.Session() as s:
|
|
for chat_id in chat_ids:
|
|
for msg_idx, msg_text in enumerate(messages):
|
|
payload = {
|
|
"chat_id": chat_id,
|
|
"text": msg_text,
|
|
"parse_mode": "Markdown",
|
|
"disable_web_page_preview": True,
|
|
}
|
|
try:
|
|
resp = s.post(url, json=payload, timeout=15)
|
|
if resp.status_code == 200:
|
|
sent_ok = True
|
|
else:
|
|
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
|
|
chat_id, resp.status_code, resp.text[:500])
|
|
time.sleep(0.25)
|
|
except Exception as e:
|
|
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
|
|
|
|
return sent_ok
|
|
|
|
|
|
# =============================================================================
|
|
# Main
|
|
# =============================================================================
|
|
def main(chat_ids: Optional[List[str]] = None, debug_mode: bool = False, chat_id: Optional[str] = None) -> None:
|
|
LOGGER.info("--- Snow Radar ---")
|
|
|
|
# Se chat_id è specificato, usa quello (per chiamate da Telegram)
|
|
if chat_id:
|
|
chat_ids = [chat_id]
|
|
elif debug_mode and not chat_ids:
|
|
# In debug mode, default al primo chat ID (admin)
|
|
chat_ids = [TELEGRAM_CHAT_IDS[0]]
|
|
|
|
now = now_local()
|
|
|
|
# Centro: San Marino (per calcolo distanze)
|
|
CENTER_LAT = 43.9356
|
|
CENTER_LON = 12.4296
|
|
|
|
# Analizza località predefinite
|
|
LOGGER.info("Analisi %d località predefinite...", len(LOCATIONS))
|
|
with requests.Session() as session:
|
|
configure_open_meteo_session(session, headers=HTTP_HEADERS)
|
|
results = []
|
|
for i, loc in enumerate(LOCATIONS):
|
|
# Calcola distanza da San Marino
|
|
distance_km = calculate_distance_km(CENTER_LAT, CENTER_LON, loc["lat"], loc["lon"])
|
|
|
|
LOGGER.debug("Analizzando località %d/%d: %s (%.2f km)", i+1, len(LOCATIONS), loc["name"], distance_km)
|
|
|
|
data = get_forecast(session, loc["lat"], loc["lon"])
|
|
if not data:
|
|
continue
|
|
|
|
snow_analysis = analyze_snowfall_for_location(data, now)
|
|
if not snow_analysis:
|
|
continue
|
|
|
|
# Aggiungi sempre Casa, anche se non c'è neve
|
|
# Per le altre località, aggiungi solo se c'è neve (passata o prevista)
|
|
is_casa = loc["name"] == "Casa (Strada Cà Toro)"
|
|
has_snow = (snow_analysis["snow_past_12h"] > 0.0 or
|
|
snow_analysis["snow_next_12h"] > 0.0 or
|
|
snow_analysis["snow_next_24h"] > 0.0)
|
|
|
|
if is_casa or has_snow:
|
|
results.append({
|
|
"name": loc["name"],
|
|
"lat": loc["lat"],
|
|
"lon": loc["lon"],
|
|
"distance_km": distance_km,
|
|
**snow_analysis
|
|
})
|
|
|
|
# Rate limiting per Open-Meteo
|
|
time.sleep(0.1)
|
|
|
|
if not results:
|
|
LOGGER.info("Nessuna neve rilevata nelle località monitorate")
|
|
if debug_mode:
|
|
message = "❄️ *SNOW RADAR*\n\nNessuna neve rilevata nelle località monitorate."
|
|
telegram_send_markdown(message, chat_ids=chat_ids)
|
|
return
|
|
|
|
# Genera e invia DUE mappe separate
|
|
now_str = now.strftime('%d/%m/%Y %H:%M')
|
|
|
|
# 1. Mappa snowfall passato (12h precedenti)
|
|
map_path_past = os.path.join(BASE_DIR, "snow_radar_past.png")
|
|
map_generated_past = generate_snow_map(results, CENTER_LAT, CENTER_LON, map_path_past,
|
|
data_field="snow_past_12h",
|
|
title_suffix=" - Ultime 12h")
|
|
if map_generated_past:
|
|
caption_past = (
|
|
f"❄️ *SNOW RADAR - Ultime 12h*\n"
|
|
f"📍 Centro: San Marino\n"
|
|
f"🕒 {now_str}\n"
|
|
f"📊 Località con neve: {len(results)}/{len(LOCATIONS)}"
|
|
)
|
|
telegram_send_photo(map_path_past, caption_past, chat_ids=chat_ids)
|
|
# Pulisci file temporaneo
|
|
try:
|
|
if os.path.exists(map_path_past):
|
|
os.remove(map_path_past)
|
|
except Exception:
|
|
pass
|
|
|
|
# 2. Mappa snowfall futuro (24h successive)
|
|
map_path_future = os.path.join(BASE_DIR, "snow_radar_future.png")
|
|
map_generated_future = generate_snow_map(results, CENTER_LAT, CENTER_LON, map_path_future,
|
|
data_field="snow_next_24h",
|
|
title_suffix=" - Prossime 24h")
|
|
if map_generated_future:
|
|
caption_future = (
|
|
f"❄️ *SNOW RADAR - Prossime 24h*\n"
|
|
f"📍 Centro: San Marino\n"
|
|
f"🕒 {now_str}\n"
|
|
f"📊 Località con neve: {len(results)}/{len(LOCATIONS)}"
|
|
)
|
|
telegram_send_photo(map_path_future, caption_future, chat_ids=chat_ids)
|
|
# Pulisci file temporaneo
|
|
try:
|
|
if os.path.exists(map_path_future):
|
|
os.remove(map_path_future)
|
|
except Exception:
|
|
pass
|
|
|
|
if map_generated_past or map_generated_future:
|
|
LOGGER.info("Mappe inviate con successo (%d località, passato: %s, futuro: %s)",
|
|
len(results), "sì" if map_generated_past else "no",
|
|
"sì" if map_generated_future else "no")
|
|
else:
|
|
LOGGER.error("Errore generazione mappe")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
arg_parser = argparse.ArgumentParser(description="Snow Radar - Analisi neve in griglia 30km")
|
|
arg_parser.add_argument("--debug", action="store_true", help="Invia messaggi solo all'admin (chat ID: %s)" % TELEGRAM_CHAT_IDS[0])
|
|
arg_parser.add_argument("--chat_id", type=str, help="Chat ID specifico per invio messaggio (override debug mode)")
|
|
args = arg_parser.parse_args()
|
|
|
|
# Se --chat_id è specificato, usa quello; altrimenti usa logica debug
|
|
chat_id = args.chat_id if args.chat_id else None
|
|
chat_ids = None if chat_id else ([TELEGRAM_CHAT_IDS[0]] if args.debug else None)
|
|
|
|
main(chat_ids=chat_ids, debug_mode=args.debug, chat_id=chat_id)
|