Files
loogle-scripts/services/telegram-bot/previsione7.py

264 lines
9.6 KiB
Python
Executable File

#!/usr/bin/env python3
import requests
import argparse
import datetime
import os
import sys
from zoneinfo import ZoneInfo
from collections import defaultdict
# --- CONFIGURAZIONE DEFAULT ---
DEFAULT_LAT = 43.9356
DEFAULT_LON = 12.4296
DEFAULT_NAME = "🏠 Casa (Strada Cà Toro)"
# --- TIMEZONE ---
TZ_STR = "Europe/Rome"
# --- TELEGRAM CONFIG ---
ADMIN_CHAT_ID = "64463169"
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# --- SOGLIE ---
SOGLIA_VENTO_KMH = 40.0
MIN_MM_PER_EVENTO = 0.1
def get_bot_token():
for path in [TOKEN_FILE_ETC, TOKEN_FILE_HOME]:
if os.path.exists(path):
try:
with open(path, 'r') as f: return f.read().strip()
except: pass
sys.exit(1)
def get_coordinates(query):
if not query or query.lower() == "casa":
return DEFAULT_LAT, DEFAULT_LON, DEFAULT_NAME
url = "https://geocoding-api.open-meteo.com/v1/search"
try:
resp = requests.get(url, params={"name": query, "count": 1, "language": "it", "format": "json"}, timeout=5)
res = resp.json().get("results", [])[0]
return res['latitude'], res['longitude'], f"{res.get('name')} ({res.get('country_code','')})"
except: return None, None, None
def get_weather(lat, lon):
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat, "longitude": lon,
"hourly": "temperature_2m,precipitation_probability,precipitation,weathercode,windspeed_10m,dewpoint_2m",
"daily": "temperature_2m_max,temperature_2m_min,sunrise,sunset",
"timezone": TZ_STR, "models": "best_match", "forecast_days": 8
}
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
return resp.json()
except: return None
def get_precip_type(code):
"""Definisce il tipo di precipitazione in base al codice WMO."""
# Neve (71-77, 85-86)
if (71 <= code <= 77) or code in [85, 86]: return "❄️ Neve"
# Grandine (96-99)
if code in [96, 99]: return "⚡🌨 Grandine"
# Pioggia congelantesi (66-67)
if code in [66, 67]: return "🧊☔ Pioggia Congelantesi"
# Pioggia standard
return "☔ Pioggia"
def get_intensity_label(mm_h):
if mm_h < 2.5: return "Debole"
if mm_h < 7.6: return "Moderata"
return "Forte ⚠️"
def analyze_daily_events(times, codes, probs, precip, winds, temps, dewpoints):
"""Scansiona le 24 ore e trova blocchi di eventi continui."""
events = []
# ==========================================
# 1. LIVELLO PERICOLI (Ghiaccio, Gelo, Brina)
# ==========================================
in_ice = False
start_ice = 0
ice_type = ""
for i in range(len(times)):
t = temps[i]
d = dewpoints[i]
p = precip[i]
c = codes[i]
current_ice_condition = None
# A. GELICIDIO (Pericolo massimo)
# Se il codice è esplicitamente Gelicidio (66,67) OPPURE piove (codici pioggia) con T < 0
is_raining_code = (50 <= c <= 69) or (80 <= c <= 82)
if c in [66, 67] or (p > 0 and t <= 0 and is_raining_code):
current_ice_condition = "🧊☠️ GELICIDIO"
# B. GHIACCIO/BRINA (Strada Scivolosa)
# Niente precipitazioni, T bassa (<2°C) e DewPoint vicinissimo alla T (<1°C diff)
elif p == 0 and t <= 2.0 and (t - d) < 1.0:
current_ice_condition = "⛸️⚠️ GHIACCIO/BRINA (Strada Scivolosa)"
# C. GELATA SEMPLICE (T < 0)
elif t < 0:
current_ice_condition = "❄️ Gelata notturna"
# Logica raggruppamento
if current_ice_condition and not in_ice:
in_ice = True
start_ice = i
ice_type = current_ice_condition
elif (not current_ice_condition and in_ice) or (in_ice and current_ice_condition != ice_type) or (in_ice and i == len(times)-1):
end_idx = i if not current_ice_condition else i
if end_idx > start_ice:
start_time = times[start_ice].split("T")[1][:5]
end_time = times[end_idx].split("T")[1][:5]
min_t_block = min(temps[start_ice:end_idx+1])
events.append(f"{ice_type}: {start_time}-{end_time} (Min: {min_t_block}°C)")
in_ice = False
if current_ice_condition:
in_ice = True
start_ice = i
ice_type = current_ice_condition
# ==========================================
# 2. LIVELLO PRECIPITAZIONI (Pioggia, Neve)
# ==========================================
# Nota: Non sopprimiamo più nulla. Se nevica mentre gela, li segnaliamo entrambi.
in_rain = False
start_idx = 0
current_rain_type = ""
for i in range(len(times)):
is_raining = precip[i] >= MIN_MM_PER_EVENTO
if is_raining and not in_rain:
in_rain = True
start_idx = i
current_rain_type = get_precip_type(codes[i])
# Cambio tipo precipitazione (es. da Pioggia a Neve nello stesso blocco)
elif in_rain and is_raining and get_precip_type(codes[i]) != current_rain_type:
# Chiudiamo il blocco precedente e ne apriamo uno nuovo
end_idx = i
block_precip = precip[start_idx:end_idx]
tot_mm = sum(block_precip)
max_prob = max(probs[start_idx:end_idx])
start_time = times[start_idx].split("T")[1][:5]
end_time = times[end_idx].split("T")[1][:5] # Qui combacia
avg_intensity = tot_mm / len(block_precip)
events.append(
f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%"
)
# Riavvia nuovo tipo
start_idx = i
current_rain_type = get_precip_type(codes[i])
elif (not is_raining and in_rain) or (in_rain and i == len(times)-1):
in_rain = False
end_idx = i if not is_raining else i + 1
block_precip = precip[start_idx:end_idx]
tot_mm = sum(block_precip)
if tot_mm > 0:
max_prob = max(probs[start_idx:end_idx])
start_time = times[start_idx].split("T")[1][:5]
end_time = times[end_idx-1].split("T")[1][:5]
avg_intensity = tot_mm / len(block_precip)
events.append(
f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n"
f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%"
)
# ==========================================
# 3. LIVELLO VENTO
# ==========================================
max_wind = max(winds)
if max_wind > SOGLIA_VENTO_KMH:
peak_idx = winds.index(max_wind)
peak_time = times[peak_idx].split("T")[1][:5]
events.append(f"💨 Vento Forte: Picco {max_wind}km/h alle {peak_time}")
return events
def format_report(data, location_name):
hourly = data['hourly']
daily = data['daily']
msg = f"🌍 <b>METEO ALERT: {location_name.upper()}</b>\n"
msg += f"📡 <i>Modelli: AROME/ICON HD</i>\n\n"
daily_map = defaultdict(list)
for i, t in enumerate(hourly['time']):
daily_map[t.split("T")[0]].append(i)
count = 0
for day_date, indices in daily_map.items():
if count >= 7: break
d_times = [hourly['time'][i] for i in indices]
d_codes = [hourly['weathercode'][i] for i in indices]
d_probs = [hourly['precipitation_probability'][i] for i in indices]
d_precip = [hourly['precipitation'][i] for i in indices]
d_winds = [hourly['windspeed_10m'][i] for i in indices]
d_temps = [hourly['temperature_2m'][i] for i in indices]
d_dews = [hourly['dewpoint_2m'][i] for i in indices]
try:
t_min = daily['temperature_2m_min'][count]
t_max = daily['temperature_2m_max'][count]
except:
t_min, t_max = min(d_temps), max(d_temps)
events_list = analyze_daily_events(d_times, d_codes, d_probs, d_precip, d_winds, d_temps, d_dews)
dt = datetime.datetime.strptime(day_date, "%Y-%m-%d")
day_str = dt.strftime("%a %d/%m")
msg += f"📅 <b>{day_str}</b> 🌡️ {t_min:.0f}°/{t_max:.0f}°C\n"
if events_list:
for ev in events_list:
msg += f"{ev}\n"
else:
msg += " ✅ <i>Nessun fenomeno rilevante</i>\n"
msg += "\n"
count += 1
return msg
def send_telegram(text, chat_id, token):
requests.post(f"https://api.telegram.org/bot{token}/sendMessage",
json={"chat_id": chat_id, "text": text, "parse_mode": "HTML"})
def main():
parser = argparse.ArgumentParser()
parser.add_argument("query", nargs="?", default="casa")
parser.add_argument("--chat_id")
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
token = get_bot_token()
dest_chat = args.chat_id if args.chat_id and not args.debug else ADMIN_CHAT_ID
lat, lon, name = get_coordinates(args.query)
if not lat: return send_telegram(f"'{args.query}' non trovato.", dest_chat, token)
data = get_weather(lat, lon)
if not data: return send_telegram("❌ Errore dati meteo.", dest_chat, token)
send_telegram(format_report(data, name), dest_chat, token)
if __name__ == "__main__":
main()