#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import html
import json
import logging
import os
import time
from logging.handlers import RotatingFileHandler
from typing import Dict, List, Optional, Tuple
from zoneinfo import ZoneInfo
import requests
from dateutil import parser
# =============================================================================
# SEVERE WEATHER ALERT (next 24h) - Casa (LAT/LON)
# - Freezing rain/drizzle (WMO codes 56,57,66,67) -> priorità alta, basta 1 occorrenza
# - Wind gusts persistence: >= soglia per almeno 2 ore consecutive
# - Rain persistence: soglia (mm/3h) superata per almeno 2 ore (2 finestre 3h consecutive)
#
# Telegram token: NOT in clear.
# Read order:
# 1) env TELEGRAM_BOT_TOKEN
# 2) ~/.telegram_dpc_bot_token
# 3) /etc/telegram_dpc_bot_token
#
# Debug:
# DEBUG=1 python3 severe_weather.py
#
# Log:
# ./weather_alert.log (same folder as this script)
# =============================================================================
DEBUG = os.environ.get("DEBUG", "0").strip() == "1"
# ----------------- TELEGRAM -----------------
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token")
TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token"
# ----------------- LOCATION -----------------
LAT = 43.9356
LON = 12.4296
# ----------------- THRESHOLDS -----------------
# Vento (km/h) - soglie come da tuo set
WIND_YELLOW = 62.0
WIND_ORANGE = 75.0
WIND_RED = 88.0
# Pioggia: mm in 3 ore
RAIN_3H_LIMIT = 25.0
# Persistenza minima richiesta (ore)
PERSIST_HOURS = 2 # richiesta utente: >=2 ore
# Freezing rain/drizzle codes
FREEZING_CODES = {56, 57, 66, 67}
# ----------------- HORIZON -----------------
HOURS_AHEAD = 24
# ----------------- FILES -----------------
STATE_FILE = "/home/daniely/docker/telegram-bot/weather_state.json"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
LOG_FILE = os.path.join(BASE_DIR, "weather_alert.log")
# ----------------- OPEN-METEO -----------------
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
TZ = "Europe/Rome"
TZINFO = ZoneInfo(TZ)
HTTP_HEADERS = {"User-Agent": "rpi-severe-weather/2.0"}
# Force model: AROME France HD 1.5 km
MODEL_PRIMARY = "meteofrance_arome_france_hd"
# Fallback (stessa famiglia Meteo-France) per continuità operativa
MODEL_FALLBACK = "meteofrance_seamless"
# Se True, invia messaggio "rientrata" quando tutto torna sotto soglia (non è un errore)
SEND_ALL_CLEAR = True
# =============================================================================
# LOGGING
# =============================================================================
def setup_logger() -> logging.Logger:
logger = logging.getLogger("severe_weather")
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
logger.handlers.clear()
fh = RotatingFileHandler(LOG_FILE, maxBytes=1_000_000, backupCount=5, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
fh.setFormatter(fmt)
logger.addHandler(fh)
if DEBUG:
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(fmt)
logger.addHandler(sh)
return logger
LOGGER = setup_logger()
# =============================================================================
# UTILS
# =============================================================================
def ensure_parent_dir(path: str) -> None:
parent = os.path.dirname(path)
if parent and not os.path.exists(parent):
os.makedirs(parent, exist_ok=True)
def now_local() -> datetime.datetime:
return datetime.datetime.now(TZINFO)
def read_text_file(path: str) -> str:
try:
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except FileNotFoundError:
return ""
except PermissionError:
LOGGER.debug("Permission denied reading %s", path)
return ""
except Exception as e:
LOGGER.exception("Error reading %s: %s", path, e)
return ""
def load_bot_token() -> str:
tok = os.environ.get("TELEGRAM_BOT_TOKEN", "").strip()
if tok:
return tok
tok = read_text_file(TOKEN_FILE_HOME)
if tok:
return tok
tok = read_text_file(TOKEN_FILE_ETC)
return tok.strip() if tok else ""
def parse_time_to_local(t: str) -> datetime.datetime:
"""
Robust timezone handling:
- If timestamps are naive (common when timezone=Europe/Rome), interpret as Europe/Rome.
- If timestamps include offset, convert to Europe/Rome.
"""
dt = parser.isoparse(t)
if dt.tzinfo is None:
return dt.replace(tzinfo=TZINFO)
return dt.astimezone(TZINFO)
def hhmm(dt: datetime.datetime) -> str:
return dt.strftime("%H:%M")
# =============================================================================
# TELEGRAM
# =============================================================================
def telegram_send_html(message_html: str) -> bool:
"""
Never raises. Returns True if at least one chat_id succeeded.
IMPORTANT: called only on REAL ALERTS (not on errors).
"""
token = load_bot_token()
if not token:
LOGGER.warning("Telegram token missing: message not sent.")
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
base_payload = {
"text": message_html,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
sent_ok = False
with requests.Session() as s:
for chat_id in TELEGRAM_CHAT_IDS:
payload = dict(base_payload)
payload["chat_id"] = chat_id
try:
resp = s.post(url, json=payload, timeout=15)
if resp.status_code == 200:
sent_ok = True
else:
LOGGER.error("Telegram error chat_id=%s status=%s body=%s",
chat_id, resp.status_code, resp.text[:500])
time.sleep(0.25)
except Exception as e:
LOGGER.exception("Telegram exception chat_id=%s err=%s", chat_id, e)
return sent_ok
# =============================================================================
# STATE
# =============================================================================
def load_state() -> Dict:
default = {
"alert_active": False,
"wind_level": 0,
"last_wind_peak": 0.0,
"last_rain_3h": 0.0,
"freezing_active": False,
}
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
data = json.load(f) or {}
default.update(data)
except Exception as e:
LOGGER.exception("State read error: %s", e)
return default
def save_state(state: Dict) -> None:
try:
ensure_parent_dir(STATE_FILE)
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
except Exception as e:
LOGGER.exception("State write error: %s", e)
# =============================================================================
# OPEN-METEO
# =============================================================================
def fetch_forecast(models_value: str) -> Optional[Dict]:
params = {
"latitude": LAT,
"longitude": LON,
"hourly": "precipitation,wind_gusts_10m,weather_code",
"timezone": TZ,
"forecast_days": 2,
"wind_speed_unit": "kmh",
"precipitation_unit": "mm",
"models": models_value,
}
try:
r = requests.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25)
if r.status_code == 400:
# Log reason if present; no Telegram on errors
try:
j = r.json()
LOGGER.error("Open-Meteo 400 (models=%s): %s", models_value, j.get("reason", j))
except Exception:
LOGGER.error("Open-Meteo 400 (models=%s): %s", models_value, r.text[:500])
return None
r.raise_for_status()
return r.json()
except Exception as e:
LOGGER.exception("Open-Meteo request error (models=%s): %s", models_value, e)
return None
def get_forecast() -> Tuple[Optional[Dict], str]:
LOGGER.debug("Requesting Open-Meteo with models=%s", MODEL_PRIMARY)
data = fetch_forecast(MODEL_PRIMARY)
if data is not None:
return data, MODEL_PRIMARY
LOGGER.warning("Primary model failed (%s). Trying fallback=%s", MODEL_PRIMARY, MODEL_FALLBACK)
data2 = fetch_forecast(MODEL_FALLBACK)
if data2 is not None:
return data2, MODEL_FALLBACK
return None, MODEL_PRIMARY
# =============================================================================
# PERSISTENCE LOGIC
# =============================================================================
def best_wind_persistent_level(
gusts: List[float],
times: List[str],
start_idx: int,
end_idx: int,
persist_hours: int
) -> Tuple[int, float, str, int]:
"""
Returns:
(level, peak_gust_within_level, first_start_hhmm, run_length_hours)
Level 0 means not persistent above yellow.
Persistence means >= threshold for >= persist_hours consecutive hourly points.
"""
thresholds = [
(3, WIND_RED),
(2, WIND_ORANGE),
(1, WIND_YELLOW),
]
# Convert times to local datetimes once for speed/readability
dt_list = [parse_time_to_local(t) for t in times]
def find_run(threshold: float) -> Tuple[bool, float, str, int]:
consec = 0
run_start = None
run_peak = 0.0
best_start = ""
best_peak = 0.0
best_len = 0
for i in range(start_idx, end_idx):
try:
v = float(gusts[i])
except Exception:
v = 0.0
if v >= threshold:
if consec == 0:
run_start = i
run_peak = v
else:
run_peak = max(run_peak, v)
consec += 1
# record first qualifying run of required length
if consec >= persist_hours and run_start is not None:
# lock the first time a qualifying run appears, but keep peak within that run length
# Extend peak as the run continues
# If user prefers "meglio uno in più", we take the first qualifying run.
if best_len == 0:
best_start = hhmm(dt_list[run_start])
best_peak = run_peak
best_len = consec
else:
# If same threshold continues longer, update peak and length
if run_start == (run_start if best_len else run_start):
best_peak = max(best_peak, run_peak)
best_len = max(best_len, consec)
else:
consec = 0
run_start = None
run_peak = 0.0
if best_len >= persist_hours:
return True, best_peak, best_start, best_len
return False, 0.0, "", 0
for lvl, thr in thresholds:
ok, peak, start_hhmm, run_len = find_run(thr)
if ok:
return lvl, peak, start_hhmm, run_len
return 0, 0.0, "", 0
def best_rain_persistent_3h(
rain: List[float],
times: List[str],
start_idx: int,
end_idx: int,
limit_3h: float,
persist_hours: int
) -> Tuple[float, str, int]:
"""
Returns:
(max_3h_sum, first_start_hhmm_of_persistent_exceedance, persistence_hours)
Compute rolling 3h sums (hourly precipitation totals).
Persistence >=2h means: at least `persist_hours` consecutive rolling windows exceed the limit.
Each shift by 1 hour -> 'persistence_hours' approximates how long intense conditions persist.
"""
if end_idx - start_idx < 3:
return 0.0, "", 0
dt_list = [parse_time_to_local(t) for t in times]
# rolling sums for each window start i (covers i, i+1, i+2)
window_starts = list(range(start_idx, end_idx - 2))
sums = []
for i in window_starts:
try:
s = float(rain[i]) + float(rain[i + 1]) + float(rain[i + 2])
except Exception:
s = 0.0
sums.append(s)
# Find persistent exceedance: sums[j] >= limit for >= persist_hours consecutive j
best_max = 0.0
best_start = ""
best_persist = 0
consec = 0
run_start_j = None
run_max = 0.0
for j, s in enumerate(sums):
if s >= limit_3h:
if consec == 0:
run_start_j = j
run_max = s
else:
run_max = max(run_max, s)
consec += 1
if consec >= persist_hours and run_start_j is not None:
# take first persistent run (meglio uno in più), but keep track of max within it
if best_persist == 0:
start_i = window_starts[run_start_j]
best_start = hhmm(dt_list[start_i])
best_persist = consec
best_max = run_max
else:
# if same run continues, update
best_persist = max(best_persist, consec)
best_max = max(best_max, run_max)
else:
consec = 0
run_start_j = None
run_max = 0.0
return best_max, best_start, best_persist
# =============================================================================
# MESSAGE BUILDERS
# =============================================================================
def wind_message(level: int, peak: float, start_hhmm: str, run_len: int) -> str:
# run_len is in hours (number of consecutive hourly points)
if level == 3:
icon = "🔴"
title = "TEMPESTA (Burrasca fortissima)"
thr = WIND_RED
elif level == 2:
icon = "🟠"
title = "VENTO MOLTO FORTE"
thr = WIND_ORANGE
else:
icon = "🟡"
title = "VENTO FORTE"
thr = WIND_YELLOW
return (
f"{icon} {title}
"
f"Persistenza: ≥ {PERSIST_HOURS} ore sopra soglia ({thr:.0f} km/h).
"
f"🕒 Inizio stimato: {html.escape(start_hhmm or '—')}
"
f"📈 Picco in finestra: {peak:.0f} km/h (run ~{run_len}h)."
)
def rain_message(max_3h: float, start_hhmm: str, persist_h: int) -> str:
return (
"🌧️ PIOGGIA INTENSA
"
f"Persistenza: ≥ {PERSIST_HOURS} ore sopra soglia ({RAIN_3H_LIMIT:.1f} mm/3h).
"
f"🕒 Inizio stimato: {html.escape(start_hhmm or '—')}
"
f"📈 Max su 3 ore in finestra: {max_3h:.1f} mm (persistenza ~{persist_h}h)."
)
# =============================================================================
# MAIN
# =============================================================================
def analyze() -> None:
LOGGER.info("--- Controllo Meteo Severo (Wind/Rain/Ice) ---")
data, model_used = get_forecast()
if not data:
# No Telegram on errors
return
hourly = (data.get("hourly", {}) or {})
times = hourly.get("time", []) or []
gusts = hourly.get("wind_gusts_10m", []) or []
rain = hourly.get("precipitation", []) or []
wcode = hourly.get("weather_code", []) or []
n = min(len(times), len(gusts), len(rain), len(wcode))
if n == 0:
LOGGER.error("Empty hourly series (model=%s).", model_used)
return
times = times[:n]
gusts = gusts[:n]
rain = rain[:n]
wcode = wcode[:n]
now = now_local()
state = load_state()
was_alarm = bool(state.get("alert_active", False))
# Find starting index: first timestep >= now
start_idx = -1
for i, t in enumerate(times):
if parse_time_to_local(t) >= now:
start_idx = i
break
if start_idx == -1:
LOGGER.error("Could not locate current time index in forecast timeline.")
return
end_idx = min(start_idx + HOURS_AHEAD, n)
if end_idx <= start_idx:
LOGGER.error("Invalid horizon window (start=%s end=%s).", start_idx, end_idx)
return
if DEBUG:
LOGGER.debug("model=%s start_idx=%s end_idx=%s (hours=%s)",
model_used, start_idx, end_idx, end_idx - start_idx)
# --- Freezing detection (no persistence needed) ---
freezing_detected = False
freezing_time = ""
for i in range(start_idx, end_idx):
try:
code = int(wcode[i])
except Exception:
code = -1
if code in FREEZING_CODES:
freezing_detected = True
if not freezing_time:
freezing_time = hhmm(parse_time_to_local(times[i]))
break
# --- Wind persistence ---
wind_level_curr, wind_peak, wind_start, wind_run_len = best_wind_persistent_level(
gusts=gusts,
times=times,
start_idx=start_idx,
end_idx=end_idx,
persist_hours=PERSIST_HOURS
)
# --- Rain persistence (3h windows) ---
rain_max_3h, rain_start, rain_persist = best_rain_persistent_3h(
rain=rain,
times=times,
start_idx=start_idx,
end_idx=end_idx,
limit_3h=RAIN_3H_LIMIT,
persist_hours=PERSIST_HOURS
)
# --- Decide notifications ---
alerts: List[str] = []
should_notify = False
# 1) Freezing rain (priority)
if freezing_detected:
if not bool(state.get("freezing_active", False)):
alerts.append(
"🧊 ALLARME GELICIDIO
"
"Prevista pioggia che gela (freezing rain/drizzle).
"
f"🕒 Inizio stimato: {html.escape(freezing_time or '—')}
"
"Pericolo ghiaccio su strada."
)
should_notify = True
state["freezing_active"] = True
else:
state["freezing_active"] = False
# 2) Wind (persistent)
if wind_level_curr > 0:
prev_level = int(state.get("wind_level", 0) or 0)
if (not was_alarm) or (wind_level_curr > prev_level):
alerts.append(wind_message(wind_level_curr, wind_peak, wind_start, wind_run_len))
should_notify = True
state["wind_level"] = wind_level_curr
state["last_wind_peak"] = float(wind_peak)
else:
state["wind_level"] = 0
state["last_wind_peak"] = 0.0
# 3) Rain (persistent)
if rain_persist >= PERSIST_HOURS and rain_max_3h >= RAIN_3H_LIMIT:
prev_rain = float(state.get("last_rain_3h", 0.0) or 0.0)
# "Meglio uno in più": notifica anche al primo superamento persistente,
# e ri-notifica se peggiora di >= +10mm sul massimo 3h
if (not was_alarm) or (rain_max_3h >= prev_rain + 10.0):
alerts.append(rain_message(rain_max_3h, rain_start, rain_persist))
should_notify = True
state["last_rain_3h"] = float(rain_max_3h)
else:
state["last_rain_3h"] = 0.0
is_alarm_now = (
freezing_detected
or (wind_level_curr > 0)
or (rain_persist >= PERSIST_HOURS and rain_max_3h >= RAIN_3H_LIMIT)
)
# --- Send only on alerts (never on errors) ---
if should_notify and alerts:
headline = "⚠️ AVVISO METEO SEVERO"
meta = (
f"📍 Casa (LAT {LAT:.4f}, LON {LON:.4f})
"
f"🕒 Finestra: prossime {HOURS_AHEAD} ore
"
f"🛰️ Modello: {html.escape(model_used)}
"
f"⏱️ Persistenza minima: {PERSIST_HOURS} ore
"
)
body = "
".join(alerts)
footer = "
Fonte dati: Open-Meteo"
msg = f"{headline}
{meta}
{body}{footer}"
ok = telegram_send_html(msg)
if ok:
LOGGER.info("Alert sent successfully.")
else:
LOGGER.warning("Alert NOT sent (token missing or Telegram error).")
state["alert_active"] = True
save_state(state)
return
# Optional: cleared message (transition only)
if SEND_ALL_CLEAR and was_alarm and (not is_alarm_now):
msg = (
"🟢 ALLERTA METEO RIENTRATA
"
"Condizioni rientrate sotto le soglie di guardia.
"
f"🕒 Finestra: prossime {HOURS_AHEAD} ore
"
f"🛰️ Modello: {html.escape(model_used)}
"
f"⏱️ Persistenza minima: {PERSIST_HOURS} ore"
)
ok = telegram_send_html(msg)
if ok:
LOGGER.info("All-clear sent successfully.")
else:
LOGGER.warning("All-clear NOT sent (token missing or Telegram error).")
state = {
"alert_active": False,
"wind_level": 0,
"last_wind_peak": 0.0,
"last_rain_3h": 0.0,
"freezing_active": False,
}
save_state(state)
return
# No new alert
state["alert_active"] = bool(is_alarm_now)
save_state(state)
LOGGER.info(
"No new alert. model=%s wind_level=%s rain3h=%.1fmm(persist=%sh) ice=%s",
model_used, wind_level_curr, rain_max_3h, rain_persist, freezing_detected
)
if __name__ == "__main__":
analyze()