Compare commits
4 Commits
db7f479e71
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 1ccb77c7ae | |||
| d06a8eddc3 | |||
| 7f16f53028 | |||
| 7cf8e285b6 |
@@ -18,7 +18,6 @@ NAS_USER = "daniely"
|
|||||||
MASTER_IP = "192.168.128.80"
|
MASTER_IP = "192.168.128.80"
|
||||||
|
|
||||||
# --- LISTE DISPOSITIVI ---
|
# --- LISTE DISPOSITIVI ---
|
||||||
# Core: Dispositivi intelligenti (SSH, CPU, RAM)
|
|
||||||
CORE_DEVICES = [
|
CORE_DEVICES = [
|
||||||
{"name": "🍓 Pi-1 (Master)", "ip": MASTER_IP, "type": "pi", "user": SSH_USER},
|
{"name": "🍓 Pi-1 (Master)", "ip": MASTER_IP, "type": "pi", "user": SSH_USER},
|
||||||
{"name": "🍓 Pi-2 (Backup)", "ip": "127.0.0.1", "type": "local", "user": ""},
|
{"name": "🍓 Pi-2 (Backup)", "ip": "127.0.0.1", "type": "local", "user": ""},
|
||||||
@@ -26,7 +25,6 @@ CORE_DEVICES = [
|
|||||||
{"name": "🗄️ NAS 214", "ip": "192.168.128.90", "type": "nas", "user": NAS_USER}
|
{"name": "🗄️ NAS 214", "ip": "192.168.128.90", "type": "nas", "user": NAS_USER}
|
||||||
]
|
]
|
||||||
|
|
||||||
# Infra: Solo Ping e Riavvio (Switch, WiFi)
|
|
||||||
INFRA_DEVICES = [
|
INFRA_DEVICES = [
|
||||||
{"name": "📡 Router", "ip": "192.168.128.1"},
|
{"name": "📡 Router", "ip": "192.168.128.1"},
|
||||||
{"name": "📶 WiFi Sala", "ip": "192.168.128.101"},
|
{"name": "📶 WiFi Sala", "ip": "192.168.128.101"},
|
||||||
@@ -47,27 +45,41 @@ logger = logging.getLogger(__name__)
|
|||||||
def run_cmd(command, ip=None, user=None):
|
def run_cmd(command, ip=None, user=None):
|
||||||
try:
|
try:
|
||||||
if ip == "127.0.0.1" or ip is None:
|
if ip == "127.0.0.1" or ip is None:
|
||||||
return subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT).decode('utf-8').strip()
|
return subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT, timeout=5).decode('utf-8').strip()
|
||||||
else:
|
else:
|
||||||
safe_cmd = command.replace("'", "'\\''")
|
safe_cmd = command.replace("'", "'\\''")
|
||||||
full_cmd = f"ssh -o LogLevel=ERROR -o BatchMode=yes -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o ConnectTimeout=4 {user}@{ip} '{safe_cmd}'"
|
full_cmd = f"ssh -o LogLevel=ERROR -o BatchMode=yes -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o ConnectTimeout=3 {user}@{ip} '{safe_cmd}'"
|
||||||
return subprocess.check_output(full_cmd, shell=True, stderr=subprocess.STDOUT).decode('utf-8').strip()
|
return subprocess.check_output(full_cmd, shell=True, stderr=subprocess.STDOUT, timeout=8).decode('utf-8').strip()
|
||||||
except Exception: return "Err"
|
except Exception: return "Err"
|
||||||
|
|
||||||
def get_ping_icon(ip):
|
def get_ping_icon(ip):
|
||||||
|
print(f"DEBUG: Pinging {ip}...") # LOG PER CAPIRE DOVE SI BLOCCA
|
||||||
try:
|
try:
|
||||||
# Ping ultra-rapido (Timeout 1s)
|
# Timeout aggressivo: 0.5 secondi (-W 1 è il minimo di ping standard, ma Python taglia a 0.8)
|
||||||
subprocess.check_call(["ping", "-c", "1", "-W", "1", ip], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
subprocess.run(
|
||||||
|
["ping", "-c", "1", "-W", "1", ip],
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.DEVNULL,
|
||||||
|
timeout=0.8, # Timeout Python brutale
|
||||||
|
check=True
|
||||||
|
)
|
||||||
return "✅"
|
return "✅"
|
||||||
except: return "🔴"
|
except subprocess.TimeoutExpired:
|
||||||
|
return "🔴" # Timeout Python
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
return "🔴" # Risposta "Host Unreachable"
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Errore Ping {ip}: {e}")
|
||||||
|
return "❓"
|
||||||
|
|
||||||
def get_device_stats(device):
|
def get_device_stats(device):
|
||||||
ip, user, dtype = device['ip'], device['user'], device['type']
|
ip, user, dtype = device['ip'], device['user'], device['type']
|
||||||
uptime_raw = run_cmd("uptime -p 2>/dev/null || uptime", ip, user)
|
uptime_raw = run_cmd("uptime -p 2>/dev/null || uptime", ip, user)
|
||||||
if not uptime_raw or "Err" in uptime_raw: return "🔴 **OFFLINE**"
|
if not uptime_raw or "Err" in uptime_raw: return "🔴 **OFFLINE**"
|
||||||
uptime = uptime_raw.replace("up ", "").split(", load")[0].split(", ")[0]
|
|
||||||
|
|
||||||
|
uptime = uptime_raw.replace("up ", "").split(", load")[0].split(", ")[0]
|
||||||
temp = "N/A"
|
temp = "N/A"
|
||||||
|
|
||||||
if dtype in ["pi", "local"]:
|
if dtype in ["pi", "local"]:
|
||||||
t = run_cmd("cat /sys/class/thermal/thermal_zone0/temp", ip, user)
|
t = run_cmd("cat /sys/class/thermal/thermal_zone0/temp", ip, user)
|
||||||
if t.isdigit(): temp = f"{int(t)/1000:.1f}°C"
|
if t.isdigit(): temp = f"{int(t)/1000:.1f}°C"
|
||||||
@@ -78,6 +90,7 @@ def get_device_stats(device):
|
|||||||
|
|
||||||
if dtype == "nas": ram_cmd = "free | grep Mem | awk '{printf \"%.0f%%\", $3*100/$2}'"
|
if dtype == "nas": ram_cmd = "free | grep Mem | awk '{printf \"%.0f%%\", $3*100/$2}'"
|
||||||
else: ram_cmd = "free -m | awk 'NR==2{if ($2>0) printf \"%.0f%%\", $3*100/$2; else print \"0%\"}'"
|
else: ram_cmd = "free -m | awk 'NR==2{if ($2>0) printf \"%.0f%%\", $3*100/$2; else print \"0%\"}'"
|
||||||
|
|
||||||
disk_path = "/" if dtype != "nas" else "/volume1"
|
disk_path = "/" if dtype != "nas" else "/volume1"
|
||||||
disk_cmd = f"df -h {disk_path} | awk 'NR==2{{print $5}}'"
|
disk_cmd = f"df -h {disk_path} | awk 'NR==2{{print $5}}'"
|
||||||
|
|
||||||
@@ -92,7 +105,7 @@ def run_speedtest():
|
|||||||
try: return subprocess.check_output("speedtest-cli --simple", shell=True, timeout=50).decode('utf-8')
|
try: return subprocess.check_output("speedtest-cli --simple", shell=True, timeout=50).decode('utf-8')
|
||||||
except: return "Errore Speedtest"
|
except: return "Errore Speedtest"
|
||||||
|
|
||||||
# --- GESTIONE MENU ---
|
# --- BOT HANDLERS ---
|
||||||
def restricted(func):
|
def restricted(func):
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
|
async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
|
||||||
@@ -102,18 +115,16 @@ def restricted(func):
|
|||||||
|
|
||||||
@restricted
|
@restricted
|
||||||
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
# MENU PRINCIPALE
|
|
||||||
keyboard = [
|
keyboard = [
|
||||||
[InlineKeyboardButton("🖥️ Core Server", callback_data="menu_core"), InlineKeyboardButton("🔍 Scan LAN", callback_data="menu_lan")],
|
[InlineKeyboardButton("🖥️ Core Server", callback_data="menu_core"), InlineKeyboardButton("🔍 Scan LAN", callback_data="menu_lan")],
|
||||||
[InlineKeyboardButton("🛡️ Pi-hole", callback_data="menu_pihole"), InlineKeyboardButton("🌐 Rete", callback_data="menu_net")],
|
[InlineKeyboardButton("🛡️ Pi-hole", callback_data="menu_pihole"), InlineKeyboardButton("🌐 Rete", callback_data="menu_net")],
|
||||||
[InlineKeyboardButton("📜 Logs", callback_data="menu_logs")]
|
[InlineKeyboardButton("📜 Logs", callback_data="menu_logs")]
|
||||||
]
|
]
|
||||||
text = "🎛 **Loogle Control Center**\nSeleziona un pannello:"
|
text = "🎛 **Loogle Control Center v6.2**\nSeleziona un pannello:"
|
||||||
|
|
||||||
if update.message:
|
if update.message:
|
||||||
await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
|
await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
|
||||||
else:
|
else:
|
||||||
# Se chiamato da un pulsante "Indietro", modifica il messaggio esistente
|
|
||||||
await update.callback_query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
|
await update.callback_query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
|
||||||
|
|
||||||
@restricted
|
@restricted
|
||||||
@@ -124,7 +135,6 @@ async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
|
|||||||
|
|
||||||
if data == "main_menu": await start(update, context)
|
if data == "main_menu": await start(update, context)
|
||||||
|
|
||||||
# --- MENU CORE ---
|
|
||||||
elif data == "menu_core":
|
elif data == "menu_core":
|
||||||
keyboard = []
|
keyboard = []
|
||||||
for i, dev in enumerate(CORE_DEVICES): keyboard.append([InlineKeyboardButton(dev['name'], callback_data=f"stat_{i}")])
|
for i, dev in enumerate(CORE_DEVICES): keyboard.append([InlineKeyboardButton(dev['name'], callback_data=f"stat_{i}")])
|
||||||
@@ -143,42 +153,43 @@ async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
|
|||||||
await query.edit_message_text(f"⏳ Controllo {dev['name']}...", parse_mode="Markdown")
|
await query.edit_message_text(f"⏳ Controllo {dev['name']}...", parse_mode="Markdown")
|
||||||
await query.edit_message_text(f"🔹 **{dev['name']}**\n\n{get_device_stats(dev)}", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Indietro", callback_data="menu_core")]]), parse_mode="Markdown")
|
await query.edit_message_text(f"🔹 **{dev['name']}**\n\n{get_device_stats(dev)}", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Indietro", callback_data="menu_core")]]), parse_mode="Markdown")
|
||||||
|
|
||||||
# --- MENU LAN (DIAGNOSTICA) ---
|
|
||||||
elif data == "menu_lan":
|
elif data == "menu_lan":
|
||||||
await query.edit_message_text("⏳ **Ping test in corso...**", parse_mode="Markdown")
|
await query.edit_message_text("⏳ **Scansione LAN rapida...**", parse_mode="Markdown")
|
||||||
|
|
||||||
report = "🔍 **DIAGNOSTICA LAN**\n\n"
|
report = "🔍 **DIAGNOSTICA LAN**\n\n"
|
||||||
|
|
||||||
# Genera tabella compatta
|
try:
|
||||||
for dev in INFRA_DEVICES:
|
# Core Devices
|
||||||
status = get_ping_icon(dev['ip'])
|
for dev in CORE_DEVICES:
|
||||||
# Aggiunge riga: Icona Nome (IP)
|
report += f"{get_ping_icon(dev['ip'])} `{dev['ip']}` - {dev['name']}\n"
|
||||||
report += f"{status} **{dev['name']}**\n`{dev['ip']}`\n"
|
|
||||||
|
report += "\n"
|
||||||
|
|
||||||
|
# Infra Devices
|
||||||
|
for dev in INFRA_DEVICES:
|
||||||
|
report += f"{get_ping_icon(dev['ip'])} `{dev['ip']}` - {dev['name']}\n"
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
report += f"\n⚠️ Errore imprevisto durante scansione: {e}"
|
||||||
|
|
||||||
keyboard = [
|
keyboard = [
|
||||||
[InlineKeyboardButton("⚡ Menu Riavvio Dispositivi", callback_data="menu_reboot")],
|
[InlineKeyboardButton("⚡ Menu Riavvio", callback_data="menu_reboot")],
|
||||||
[InlineKeyboardButton("🔄 Aggiorna", callback_data="menu_lan")],
|
[InlineKeyboardButton("🔄 Aggiorna", callback_data="menu_lan"), InlineKeyboardButton("⬅️ Indietro", callback_data="main_menu")]
|
||||||
[InlineKeyboardButton("⬅️ Indietro", callback_data="main_menu")]
|
|
||||||
]
|
]
|
||||||
await query.edit_message_text(report, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
|
await query.edit_message_text(report, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
|
||||||
|
|
||||||
# --- MENU RIAVVIO ---
|
|
||||||
elif data == "menu_reboot":
|
elif data == "menu_reboot":
|
||||||
keyboard = []
|
keyboard = []
|
||||||
for i, dev in enumerate(INFRA_DEVICES):
|
for i, dev in enumerate(INFRA_DEVICES):
|
||||||
if "Router" not in dev['name']: # Protezione router
|
if "Router" not in dev['name']: keyboard.append([InlineKeyboardButton(f"⚡ {dev['name']}", callback_data=f"reboot_{i}")])
|
||||||
keyboard.append([InlineKeyboardButton(f"⚡ {dev['name']}", callback_data=f"reboot_{i}")])
|
|
||||||
keyboard.append([InlineKeyboardButton("⬅️ Indietro", callback_data="menu_lan")])
|
keyboard.append([InlineKeyboardButton("⬅️ Indietro", callback_data="menu_lan")])
|
||||||
await query.edit_message_text("⚠️ **RIAVVIO REMOTO**\nRichiede SSH configurato sul dispositivo target (admin).", reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
|
await query.edit_message_text("⚠️ **RIAVVIO REMOTO**\nFunziona solo se il dispositivo supporta SSH e hai le chiavi.", reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
|
||||||
|
|
||||||
elif data.startswith("reboot_"):
|
elif data.startswith("reboot_"):
|
||||||
dev = INFRA_DEVICES[int(data.split("_")[1])]
|
dev = INFRA_DEVICES[int(data.split("_")[1])]
|
||||||
# Tenta riavvio con utente 'admin' (standard per Zyxel/AP)
|
|
||||||
res = run_cmd("reboot", dev['ip'], "admin")
|
res = run_cmd("reboot", dev['ip'], "admin")
|
||||||
|
|
||||||
await query.edit_message_text(f"⚡ Comando inviato a {dev['name']}...\n\nRisposta:\n`{res}`", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Indietro", callback_data="menu_reboot")]]), parse_mode="Markdown")
|
await query.edit_message_text(f"⚡ Comando inviato a {dev['name']}...\n\nRisposta:\n`{res}`", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Indietro", callback_data="menu_reboot")]]), parse_mode="Markdown")
|
||||||
|
|
||||||
# --- ALTRI MENU (Pi-hole, Net, Logs) ---
|
# --- ALTRI MENU STANDARD ---
|
||||||
elif data == "menu_pihole":
|
elif data == "menu_pihole":
|
||||||
status_raw = run_cmd("sudo pihole status", MASTER_IP, SSH_USER)
|
status_raw = run_cmd("sudo pihole status", MASTER_IP, SSH_USER)
|
||||||
icon = "✅" if "Enabled" in status_raw or "enabled" in status_raw else "🔴"
|
icon = "✅" if "Enabled" in status_raw or "enabled" in status_raw else "🔴"
|
||||||
@@ -198,7 +209,7 @@ async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
|
|||||||
await query.edit_message_text(f"🌐 **Rete**\n🌍 IP: `{ip}`", reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
|
await query.edit_message_text(f"🌐 **Rete**\n🌍 IP: `{ip}`", reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown")
|
||||||
|
|
||||||
elif data == "net_speedtest":
|
elif data == "net_speedtest":
|
||||||
await query.edit_message_text("🚀 **Speedtest...**", parse_mode="Markdown")
|
await query.edit_message_text("🚀 **Speedtest in corso...**", parse_mode="Markdown")
|
||||||
res = run_speedtest()
|
res = run_speedtest()
|
||||||
await query.edit_message_text(f"🚀 **Risultato:**\n\n`{res}`", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Indietro", callback_data="menu_net")]]), parse_mode="Markdown")
|
await query.edit_message_text(f"🚀 **Risultato:**\n\n`{res}`", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Indietro", callback_data="menu_net")]]), parse_mode="Markdown")
|
||||||
|
|
||||||
|
|||||||
134
services/telegram-bot/freeze_alert.py
Normal file
134
services/telegram-bot/freeze_alert.py
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
import requests
|
||||||
|
import datetime
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from dateutil import parser
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
# --- CONFIGURAZIONE UTENTE ---
|
||||||
|
TELEGRAM_BOT_TOKEN = "8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4"
|
||||||
|
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
|
||||||
|
|
||||||
|
# --- COORDINATE (Strada Cà Toro, 12 - San Marino) ---
|
||||||
|
LAT = 43.9356
|
||||||
|
LON = 12.4296
|
||||||
|
LOCATION_NAME = "🏠 Casa (Strada Cà Toro)"
|
||||||
|
|
||||||
|
# Soglia Gelo (°C)
|
||||||
|
SOGLIA_GELO = 0.0
|
||||||
|
|
||||||
|
# File di stato
|
||||||
|
STATE_FILE = "/home/daniely/docker/telegram-bot/freeze_state.json"
|
||||||
|
|
||||||
|
def load_state():
|
||||||
|
if os.path.exists(STATE_FILE):
|
||||||
|
try:
|
||||||
|
with open(STATE_FILE, 'r') as f: return json.load(f)
|
||||||
|
except: pass
|
||||||
|
return {"alert_active": False, "min_temp": 100.0}
|
||||||
|
|
||||||
|
def save_state(active, min_temp):
|
||||||
|
try:
|
||||||
|
with open(STATE_FILE, 'w') as f:
|
||||||
|
json.dump({"alert_active": active, "min_temp": min_temp, "updated": str(datetime.datetime.now())}, f)
|
||||||
|
except: pass
|
||||||
|
|
||||||
|
def send_telegram_message(message):
|
||||||
|
if not TELEGRAM_BOT_TOKEN or "INSERISCI" in TELEGRAM_BOT_TOKEN:
|
||||||
|
print(f"[TEST OUT] {message}")
|
||||||
|
return
|
||||||
|
|
||||||
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
|
||||||
|
for chat_id in TELEGRAM_CHAT_IDS:
|
||||||
|
try:
|
||||||
|
requests.post(url, json={"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}, timeout=10)
|
||||||
|
time.sleep(0.2)
|
||||||
|
except: pass
|
||||||
|
|
||||||
|
def get_forecast():
|
||||||
|
url = "https://api.open-meteo.com/v1/forecast"
|
||||||
|
params = {
|
||||||
|
"latitude": LAT, "longitude": LON,
|
||||||
|
"hourly": "temperature_2m",
|
||||||
|
"timezone": "Europe/Rome",
|
||||||
|
"forecast_days": 3 # Prendiamo 3 giorni per coprire bene le 48h
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
r = requests.get(url, params=params, timeout=10)
|
||||||
|
r.raise_for_status()
|
||||||
|
return r.json()
|
||||||
|
except: return None
|
||||||
|
|
||||||
|
def analyze_freeze():
|
||||||
|
print("--- Controllo Gelo ---")
|
||||||
|
data = get_forecast()
|
||||||
|
if not data: return
|
||||||
|
|
||||||
|
hourly = data.get("hourly", {})
|
||||||
|
times = hourly.get("time", [])
|
||||||
|
temps = hourly.get("temperature_2m", [])
|
||||||
|
|
||||||
|
now = datetime.datetime.now(ZoneInfo("Europe/Rome"))
|
||||||
|
limit_time = now + datetime.timedelta(hours=48)
|
||||||
|
|
||||||
|
min_temp_val = 100.0
|
||||||
|
min_temp_time = None
|
||||||
|
|
||||||
|
# Cerca la minima nelle prossime 48 ore
|
||||||
|
for i, t_str in enumerate(times):
|
||||||
|
t_obj = parser.isoparse(t_str).replace(tzinfo=ZoneInfo("Europe/Rome"))
|
||||||
|
|
||||||
|
# Filtra solo futuro prossimo (da adesso a +48h)
|
||||||
|
if t_obj > now and t_obj <= limit_time:
|
||||||
|
temp = temps[i]
|
||||||
|
if temp < min_temp_val:
|
||||||
|
min_temp_val = temp
|
||||||
|
min_temp_time = t_obj
|
||||||
|
|
||||||
|
# --- LOGICA ALLARME ---
|
||||||
|
state = load_state()
|
||||||
|
was_active = state.get("alert_active", False)
|
||||||
|
|
||||||
|
# C'è rischio gelo?
|
||||||
|
is_freezing = min_temp_val < SOGLIA_GELO
|
||||||
|
|
||||||
|
if is_freezing:
|
||||||
|
# Formatta orario
|
||||||
|
time_str = min_temp_time.strftime('%d/%m alle %H:%M')
|
||||||
|
|
||||||
|
# SCENARIO A: NUOVO GELO (o peggioramento significativo di 2 gradi)
|
||||||
|
if not was_active or min_temp_val < state.get("min_temp", 0) - 2.0:
|
||||||
|
msg = (
|
||||||
|
f"❄️ **ALLERTA GELO**\n"
|
||||||
|
f"📍 {LOCATION_NAME}\n\n"
|
||||||
|
f"Prevista temperatura minima di **{min_temp_val:.1f}°C**\n"
|
||||||
|
f"📅 Quando: {time_str}\n\n"
|
||||||
|
f"_Proteggere piante e tubature esterne._"
|
||||||
|
)
|
||||||
|
send_telegram_message(msg)
|
||||||
|
save_state(True, min_temp_val)
|
||||||
|
print(f"Allerta inviata: {min_temp_val}°C")
|
||||||
|
else:
|
||||||
|
print(f"Gelo già notificato ({min_temp_val}°C).")
|
||||||
|
# Aggiorniamo comunque la minima registrata nel file
|
||||||
|
save_state(True, min(min_temp_val, state.get("min_temp", 100)))
|
||||||
|
|
||||||
|
# SCENARIO B: ALLARME RIENTRATO
|
||||||
|
elif was_active and not is_freezing:
|
||||||
|
msg = (
|
||||||
|
f"☀️ **RISCHIO GELO RIENTRATO**\n"
|
||||||
|
f"📍 {LOCATION_NAME}\n\n"
|
||||||
|
f"Le previsioni per le prossime 48 ore indicano temperature sopra lo zero.\n"
|
||||||
|
f"Minima prevista: {min_temp_val:.1f}°C."
|
||||||
|
)
|
||||||
|
send_telegram_message(msg)
|
||||||
|
save_state(False, min_temp_val)
|
||||||
|
print("Allarme rientrato.")
|
||||||
|
|
||||||
|
else:
|
||||||
|
save_state(False, min_temp_val)
|
||||||
|
print(f"Nessun gelo. Minima: {min_temp_val}°C")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
analyze_freeze()
|
||||||
@@ -6,19 +6,25 @@ import time
|
|||||||
from dateutil import parser
|
from dateutil import parser
|
||||||
from zoneinfo import ZoneInfo
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
# --- CONFIGURAZIONE ---
|
# --- CONFIGURAZIONE UTENTE ---
|
||||||
# 👇👇 INSERISCI QUI I TUOI DATI 👇👇
|
|
||||||
TELEGRAM_BOT_TOKEN = "8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4"
|
TELEGRAM_BOT_TOKEN = "8155587974:AAF9OekvBpixtk8ZH6KoIc0L8edbhdXt7A4"
|
||||||
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
|
TELEGRAM_CHAT_IDS = ["64463169", "24827341", "132455422", "5405962012"]
|
||||||
|
|
||||||
# Coordinate (San Marino)
|
# Coordinate (San Marino / Casa)
|
||||||
LAT = 43.9356
|
LAT = 43.9356
|
||||||
LON = 12.4296
|
LON = 12.4296
|
||||||
|
|
||||||
# SOGLIE DI ALLARME
|
# --- SOGLIE DI ALLARME ---
|
||||||
WIND_LIMIT_WARN = 60.0 # km/h (Vento Forte)
|
# Vento (Protezione Civile Emilia-Romagna)
|
||||||
WIND_LIMIT_CRIT = 90.0 # km/h (Burrasca)
|
WIND_YELLOW = 62.0 # km/h
|
||||||
RAIN_3H_LIMIT = 30.0 # mm in 3 ore (Rischio idrogeologico)
|
WIND_ORANGE = 75.0 # km/h
|
||||||
|
WIND_RED = 88.0 # km/h
|
||||||
|
|
||||||
|
# Pioggia
|
||||||
|
RAIN_3H_LIMIT = 25.0 # mm in 3 ore
|
||||||
|
|
||||||
|
# Codici WMO per Gelicidio (Freezing Rain)
|
||||||
|
FREEZING_CODES = [56, 57, 66, 67]
|
||||||
|
|
||||||
# File di stato
|
# File di stato
|
||||||
STATE_FILE = "/home/daniely/docker/telegram-bot/weather_state.json"
|
STATE_FILE = "/home/daniely/docker/telegram-bot/weather_state.json"
|
||||||
@@ -31,7 +37,7 @@ def send_telegram_message(message):
|
|||||||
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
|
||||||
for chat_id in TELEGRAM_CHAT_IDS:
|
for chat_id in TELEGRAM_CHAT_IDS:
|
||||||
try:
|
try:
|
||||||
requests.post(url, json={"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}, timeout=5)
|
requests.post(url, json={"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}, timeout=10)
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
except: pass
|
except: pass
|
||||||
|
|
||||||
@@ -39,7 +45,7 @@ def get_forecast():
|
|||||||
url = "https://api.open-meteo.com/v1/forecast"
|
url = "https://api.open-meteo.com/v1/forecast"
|
||||||
params = {
|
params = {
|
||||||
"latitude": LAT, "longitude": LON,
|
"latitude": LAT, "longitude": LON,
|
||||||
"hourly": "precipitation,windgusts_10m",
|
"hourly": "precipitation,windgusts_10m,weathercode",
|
||||||
"models": "arome_france_hd",
|
"models": "arome_france_hd",
|
||||||
"timezone": "Europe/Rome",
|
"timezone": "Europe/Rome",
|
||||||
"forecast_days": 2
|
"forecast_days": 2
|
||||||
@@ -55,7 +61,7 @@ def load_state():
|
|||||||
try:
|
try:
|
||||||
with open(STATE_FILE, 'r') as f: return json.load(f)
|
with open(STATE_FILE, 'r') as f: return json.load(f)
|
||||||
except: pass
|
except: pass
|
||||||
return {"alert_active": False, "last_wind": 0, "last_rain": 0, "time": 0}
|
return {"alert_active": False, "last_wind": 0, "last_rain": 0, "wind_level": 0, "freezing_active": False}
|
||||||
|
|
||||||
def save_state(state):
|
def save_state(state):
|
||||||
try:
|
try:
|
||||||
@@ -63,6 +69,7 @@ def save_state(state):
|
|||||||
except: pass
|
except: pass
|
||||||
|
|
||||||
def analyze():
|
def analyze():
|
||||||
|
print("--- Controllo Meteo Severo (Wind/Rain/Ice) ---")
|
||||||
data = get_forecast()
|
data = get_forecast()
|
||||||
if not data: return
|
if not data: return
|
||||||
|
|
||||||
@@ -70,6 +77,7 @@ def analyze():
|
|||||||
times = hourly.get("time", [])
|
times = hourly.get("time", [])
|
||||||
wind = hourly.get("windgusts_10m", [])
|
wind = hourly.get("windgusts_10m", [])
|
||||||
rain = hourly.get("precipitation", [])
|
rain = hourly.get("precipitation", [])
|
||||||
|
wcode = hourly.get("weathercode", [])
|
||||||
|
|
||||||
now = datetime.datetime.now(ZoneInfo("Europe/Rome"))
|
now = datetime.datetime.now(ZoneInfo("Europe/Rome"))
|
||||||
state = load_state()
|
state = load_state()
|
||||||
@@ -90,76 +98,93 @@ def analyze():
|
|||||||
max_wind_time = ""
|
max_wind_time = ""
|
||||||
sum_rain_3h = 0.0
|
sum_rain_3h = 0.0
|
||||||
|
|
||||||
# Cerca picco vento
|
freezing_rain_detected = False
|
||||||
|
freezing_time = ""
|
||||||
|
|
||||||
|
# Cerca picchi
|
||||||
for i in range(start_idx, end_idx):
|
for i in range(start_idx, end_idx):
|
||||||
|
# Vento
|
||||||
if wind[i] > max_wind:
|
if wind[i] > max_wind:
|
||||||
max_wind = wind[i]
|
max_wind = wind[i]
|
||||||
max_wind_time = parser.isoparse(times[i]).strftime('%H:%M')
|
max_wind_time = parser.isoparse(times[i]).strftime('%H:%M')
|
||||||
|
|
||||||
# Cerca picco pioggia (sliding window 3h)
|
# Gelicidio
|
||||||
|
if wcode[i] in FREEZING_CODES:
|
||||||
|
freezing_rain_detected = True
|
||||||
|
if not freezing_time: # Prendi il primo orario
|
||||||
|
freezing_time = parser.isoparse(times[i]).strftime('%H:%M')
|
||||||
|
|
||||||
|
# Pioggia (sliding window 3h)
|
||||||
for i in range(start_idx, end_idx - 3):
|
for i in range(start_idx, end_idx - 3):
|
||||||
current_sum = sum(rain[i:i+3])
|
current_sum = sum(rain[i:i+3])
|
||||||
if current_sum > sum_rain_3h:
|
if current_sum > sum_rain_3h:
|
||||||
sum_rain_3h = current_sum
|
sum_rain_3h = current_sum
|
||||||
|
|
||||||
# --- LOGICA DI DECISIONE ---
|
# --- CLASSIFICAZIONE ---
|
||||||
|
alerts = []
|
||||||
is_wind_alarm = max_wind > WIND_LIMIT_WARN
|
should_notify = False
|
||||||
is_rain_alarm = sum_rain_3h > RAIN_3H_LIMIT
|
|
||||||
IS_ALARM_NOW = is_wind_alarm or is_rain_alarm
|
|
||||||
|
|
||||||
WAS_ALARM = state.get("alert_active", False)
|
WAS_ALARM = state.get("alert_active", False)
|
||||||
|
|
||||||
alerts = []
|
# 1. GELO (Priorità Massima)
|
||||||
|
if freezing_rain_detected:
|
||||||
|
if not state.get("freezing_active", False):
|
||||||
|
alerts.append(f"🧊 **ALLARME GELICIDIO**\nPrevista pioggia che gela (Freezing Rain).\n🕒 Inizio previsto: {freezing_time}\n_Pericolo ghiaccio su strada!_")
|
||||||
|
should_notify = True
|
||||||
|
state["freezing_active"] = True
|
||||||
|
else:
|
||||||
|
state["freezing_active"] = False
|
||||||
|
|
||||||
# SCENARIO A: NUOVO ALLARME o PEGGIORAMENTO
|
# 2. VENTO
|
||||||
if IS_ALARM_NOW:
|
wind_level_curr = 0
|
||||||
should_notify = False
|
if max_wind > WIND_RED:
|
||||||
|
wind_level_curr = 3
|
||||||
|
msg = f"🔴 **TEMPESTA (Burrasca Fortissima)**\nRaffiche > {WIND_RED:.0f} km/h (Picco: **{max_wind:.0f}** km/h).\n🕒 Ore {max_wind_time}"
|
||||||
|
elif max_wind > WIND_ORANGE:
|
||||||
|
wind_level_curr = 2
|
||||||
|
msg = f"🟠 **VENTO MOLTO FORTE**\nRaffiche > {WIND_ORANGE:.0f} km/h (Picco: **{max_wind:.0f}** km/h).\n🕒 Ore {max_wind_time}"
|
||||||
|
elif max_wind > WIND_YELLOW:
|
||||||
|
wind_level_curr = 1
|
||||||
|
msg = f"🟡 **VENTO FORTE**\nRaffiche > {WIND_YELLOW:.0f} km/h (Picco: **{max_wind:.0f}** km/h).\n🕒 Ore {max_wind_time}"
|
||||||
|
|
||||||
# Logica Vento
|
if wind_level_curr > 0:
|
||||||
if is_wind_alarm:
|
if not WAS_ALARM or wind_level_curr > state.get("wind_level", 0):
|
||||||
# Notifica se è nuovo o se è peggiorato di 10km/h rispetto all'ultimo avviso
|
alerts.append(msg)
|
||||||
if not WAS_ALARM or max_wind > state.get("last_wind", 0) + 10:
|
should_notify = True
|
||||||
icon = "💨" if max_wind < WIND_LIMIT_CRIT else "🌪️ ⛔️"
|
state["wind_level"] = wind_level_curr
|
||||||
livello = "FORTE" if max_wind < WIND_LIMIT_CRIT else "BURRASCA"
|
state["last_wind"] = max_wind
|
||||||
alerts.append(f"{icon} **VENTO {livello}**\nRaffiche previste fino a **{max_wind:.0f} km/h** verso le {max_wind_time}.\n_Consiglio: Ritirare oggetti leggeri._")
|
|
||||||
state["last_wind"] = max_wind
|
|
||||||
should_notify = True
|
|
||||||
|
|
||||||
# Logica Pioggia
|
# 3. PIOGGIA
|
||||||
if is_rain_alarm:
|
if sum_rain_3h > RAIN_3H_LIMIT:
|
||||||
# Notifica se è nuovo o se è peggiorata di 10mm
|
if not WAS_ALARM or sum_rain_3h > state.get("last_rain", 0) + 10:
|
||||||
if not WAS_ALARM or sum_rain_3h > state.get("last_rain", 0) + 10:
|
alerts.append(f"🌧️ **PIOGGIA INTENSA**\nPrevisti **{sum_rain_3h:.1f} mm** in 3 ore.")
|
||||||
alerts.append(f"🌧️ **PIOGGIA INTENSA**\nPrevisti **{sum_rain_3h:.1f} mm** in 3 ore.\n_Rischio disagi idraulici._")
|
state["last_rain"] = sum_rain_3h
|
||||||
state["last_rain"] = sum_rain_3h
|
should_notify = True
|
||||||
should_notify = True
|
|
||||||
|
|
||||||
if should_notify:
|
# --- INVIO ---
|
||||||
full_msg = f"⚠️ **AVVISO METEO (Prossime 12h)**\n\n" + "\n\n".join(alerts)
|
IS_ALARM_NOW = (wind_level_curr > 0) or (sum_rain_3h > RAIN_3H_LIMIT) or freezing_rain_detected
|
||||||
send_telegram_message(full_msg)
|
|
||||||
state["alert_active"] = True
|
|
||||||
state["time"] = time.time()
|
|
||||||
save_state(state)
|
|
||||||
print("Allerta inviata.")
|
|
||||||
else:
|
|
||||||
print("Condizioni critiche ma stabili (già notificato).")
|
|
||||||
|
|
||||||
# SCENARIO B: ALLARME RIENTRATO
|
if should_notify and alerts:
|
||||||
elif WAS_ALARM and not IS_ALARM_NOW:
|
full_msg = f"⚠️ **AVVISO METEO SEVERO**\n\n" + "\n\n".join(alerts)
|
||||||
msg = (
|
send_telegram_message(full_msg)
|
||||||
f"🟢 **ALLERTA METEO RIENTRATA**\n"
|
state["alert_active"] = True
|
||||||
f"📅 _Aggiornamento ore {now.strftime('%H:%M')}_\n\n"
|
|
||||||
f"Vento e pioggia sono scesi sotto le soglie di guardia per le prossime 12 ore."
|
|
||||||
)
|
|
||||||
send_telegram_message(msg)
|
|
||||||
|
|
||||||
# Reset stato
|
|
||||||
state = {"alert_active": False, "last_wind": 0, "last_rain": 0, "time": time.time()}
|
|
||||||
save_state(state)
|
save_state(state)
|
||||||
print("Allarme rientrato inviato.")
|
print("Allerta inviata.")
|
||||||
|
|
||||||
|
elif WAS_ALARM and not IS_ALARM_NOW:
|
||||||
|
# Allarme rientrato
|
||||||
|
msg = f"🟢 **ALLERTA METEO RIENTRATA**\nLe condizioni sono tornate sotto le soglie di guardia."
|
||||||
|
send_telegram_message(msg)
|
||||||
|
state = {"alert_active": False, "last_wind": 0, "last_rain": 0, "wind_level": 0, "freezing_active": False}
|
||||||
|
save_state(state)
|
||||||
|
print("Allarme rientrato.")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print(f"Situazione tranquilla. Vento: {max_wind:.1f}, Pioggia: {sum_rain_3h:.1f}")
|
# Aggiorna stato parziale se serve (es. vento calato ma ancora presente)
|
||||||
|
if not IS_ALARM_NOW:
|
||||||
|
state["alert_active"] = False
|
||||||
|
save_state(state)
|
||||||
|
print(f"Nessun nuovo allarme. W:{max_wind:.0f} R:{sum_rain_3h:.1f} Ice:{freezing_rain_detected}")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
analyze()
|
analyze()
|
||||||
|
|||||||
Reference in New Issue
Block a user