From 2859b95dbc915a1a5dd7f148478f3f66bb87efcb Mon Sep 17 00:00:00 2001 From: daniele Date: Sun, 4 Jan 2026 07:00:03 +0100 Subject: [PATCH] Backup automatico script del 2026-01-04 07:00 --- scripts/pi2-backup/._super_watchdog.sh | Bin 0 -> 4096 bytes services/telegram-bot/._bot.py | Bin 0 -> 4096 bytes services/telegram-bot/._cam.py | Bin 0 -> 4096 bytes services/telegram-bot/._meteo.py | Bin 0 -> 4096 bytes services/telegram-bot/bot.py | 424 ++++++++++--------------- services/telegram-bot/cam.py | 97 ++++++ services/telegram-bot/meteo.py | 352 ++++++++++++++++++++ services/telegram-bot/previsione7.py | 263 +++++++++++++++ 8 files changed, 880 insertions(+), 256 deletions(-) create mode 100755 scripts/pi2-backup/._super_watchdog.sh create mode 100755 services/telegram-bot/._bot.py create mode 100755 services/telegram-bot/._cam.py create mode 100755 services/telegram-bot/._meteo.py mode change 100644 => 100755 services/telegram-bot/bot.py create mode 100755 services/telegram-bot/cam.py create mode 100644 services/telegram-bot/meteo.py create mode 100755 services/telegram-bot/previsione7.py diff --git a/scripts/pi2-backup/._super_watchdog.sh b/scripts/pi2-backup/._super_watchdog.sh new file mode 100755 index 0000000000000000000000000000000000000000..814c2c124fc73908e45f7737c2ae261d61184626 GIT binary patch literal 4096 zcmZQz6=P>$Vqox1Ojhs@R)|o50+1L3ClDJkFz{^v(m+1nBL)UWIUt(=a103vf+zv$ zU^oE8(hxqJ1etyShy@rJq>}S<^%4sTa#Hnj5{pYpi&Ill5=&B*1A?KdF92z71_n`t zx`Lwovedl9yyR4d>-WMkLFyFLbQM7~BNM~R6M>og;{roL+)*$Z0;3@?8UmvsFd71* zAut*OqaiRF0;3@?8UmvsFoHsW5$IPC1_QZ}jLc$%qSWI2(xT*4g|z&lY=z9clGMDC n%>2B>oSaI9oYb@ug`}Lsylh}!7^-VXQ>gxjdqsvp?*D%PApAB` literal 0 HcmV?d00001 diff --git a/services/telegram-bot/._bot.py b/services/telegram-bot/._bot.py new file mode 100755 index 0000000000000000000000000000000000000000..4b0e3e2b7b1adb61fd2c29f6545aa1503812bddf GIT binary patch literal 4096 zcmZQz6=P>$Vqox1Ojhs@R)|o50+1L3ClDJkFz{^v(m+1nBL)UWIUt(=a103vf+zv$ zU^oE8(hxqJ1etyShy@rJq>}S<^%4sTa#Hnj5{pYpi&Ill5=&B*1A?KdF92z71_n`t zx`Lwovedl9yyR4dEO nnfZB%IXRUIIjLzS3Q0MMdD+0eFjUu&rcnJ4_lgXI-2eXoR>?PG literal 0 HcmV?d00001 diff --git a/services/telegram-bot/._cam.py b/services/telegram-bot/._cam.py new file mode 100755 index 0000000000000000000000000000000000000000..2e655ad7b8ebdfc218abd9b501c3b0b4140b28b2 GIT binary patch literal 4096 zcmZQz6=P>$Vqox1Ojhs@R)|o50+1L3ClDJkFz{^v(m+1nBL)UWIUt(=a103vf+zv$ zV3+~K+-O=D5#plB`MG+D1qC^&dId%KWvO|IdC92^j7$tKPXuP}j|&V%(>lr>4S~@R z7!85Z5Eu=C(GVC7fzc2c4S~@R7!85Z5Eu;s>>&W^Y=STt$c1EN7Aq8`7U!21C8sK+ y$Vqox1Ojhs@R)|o50+1L3ClDJkFz{^v(m+1nBL)UWIUt(=a103vf+zv$ zU^oE8(hxqJ1etySh`AXUM3eJ#^%4sTa#HmQit@`+^AhutQ$Y$(1F--DgA_tOC$YGs zv^X^-C9xz`IUpFMh>?lm<%z(|{c(YzUZ#ukeOGKnpcvU mpO=`EQ>l=XnpUEal#`g34eSd;bq#3>)&Fp>$S}zL{|^8ggf++j literal 0 HcmV?d00001 diff --git a/services/telegram-bot/bot.py b/services/telegram-bot/bot.py old mode 100644 new mode 100755 index 007b390..e800a49 --- a/services/telegram-bot/bot.py +++ b/services/telegram-bot/bot.py @@ -5,9 +5,6 @@ import datetime import requests from functools import wraps from zoneinfo import ZoneInfo -from dateutil import parser -from typing import Dict, List, Optional, Tuple - from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update from telegram.ext import ( Application, @@ -18,34 +15,28 @@ from telegram.ext import ( ) # ============================================================================= -# LOOGLE BOT V7.9 (ULTIMATE + GLOBAL GFS FIX) -# - Dashboard Sistema -# - Meteo Smart: Arome (EU), Icon (EU-Est), JMA (JP), GFS (Mondo) -# - Multi-User Security +# LOOGLE BOT V9.0 (ULTIMATE + CAMERAS + MODULAR) +# - Dashboard Sistema (SSH/WOL/Monitor) +# - Meteo Smart (Meteo.py / Previsione7.py) +# - CCTV Hub (Cam.py + FFMPEG) # ============================================================================= -# --- CONFIGURAZIONE --- +# --- CONFIGURAZIONE AMBIENTE --- BOT_TOKEN = os.environ.get('BOT_TOKEN') - -# Gestione Multi-Utente allowed_users_raw = os.environ.get('ALLOWED_USER_ID', '') ALLOWED_IDS = [int(x.strip()) for x in allowed_users_raw.split(',') if x.strip().isdigit()] -# Configurazione Sistema SSH_USER = "daniely" NAS_USER = "daniely" MASTER_IP = "192.168.128.80" - -# Configurazione Meteo -HOME_LAT = 43.9356 -HOME_LON = 12.4296 -HOME_NAME = "🏠 Casa (Strada Cà Toro)" TZ = "Europe/Rome" TZINFO = ZoneInfo(TZ) -OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast" -GEOCODING_URL = "https://geocoding-api.open-meteo.com/v1/search" -HTTP_HEADERS = {"User-Agent": "loogle-bot-v7.9"} +# --- GESTIONE PERCORSI DINAMICA (DOCKER FRIENDLY) --- +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +METEO_SCRIPT = os.path.join(SCRIPT_DIR, "meteo.py") +METEO7_SCRIPT = os.path.join(SCRIPT_DIR, "previsione7.py") +CAM_SCRIPT = os.path.join(SCRIPT_DIR, "cam.py") # --- LISTE DISPOSITIVI --- CORE_DEVICES = [ @@ -67,17 +58,19 @@ INFRA_DEVICES = [ {"name": "🔌 Sw Tav", "ip": "192.168.128.108"} ] +# Configurazione Logging logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO) logger = logging.getLogger(__name__) # ============================================================================= -# SEZIONE 1: FUNZIONI SISTEMA (SSH, PING, UTILS) +# SEZIONE 1: FUNZIONI UTILI E HELPER # ============================================================================= def run_cmd(command, ip=None, user=None): + """Esegue comandi shell locali o via SSH""" try: if ip == "127.0.0.1" or ip is None: - return subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT, timeout=5).decode('utf-8').strip() + return subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT, timeout=8).decode('utf-8').strip() else: safe_cmd = command.replace("'", "'\\''") full_cmd = f"ssh -o LogLevel=ERROR -o BatchMode=yes -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o ConnectTimeout=3 {user}@{ip} '{safe_cmd}'" @@ -104,7 +97,7 @@ def get_device_stats(device): elif dtype == "nas": t = run_cmd("cat /sys/class/hwmon/hwmon0/temp1_input 2>/dev/null || cat /sys/class/thermal/thermal_zone0/temp", ip, user) if t.isdigit(): - v = int(t); temp = f"{v/1000:.1f}°C" if v > 1000 else f"{v}°C" + v = int(t); temp = f"{v/1000:.1f}°C" if v > 1000 else f"{v}°C" if dtype == "nas": ram_cmd = "free | grep Mem | awk '{printf \"%.0f%%\", $3*100/$2}'" else: ram_cmd = "free -m | awk 'NR==2{if ($2>0) printf \"%.0f%%\", $3*100/$2; else print \"0%\"}'" @@ -118,225 +111,27 @@ def read_log_file(filepath, lines=15): except Exception as e: return f"Errore: {str(e)}" def run_speedtest(): - try: return subprocess.check_output("speedtest-cli --simple", shell=True, timeout=50).decode('utf-8') + try: return subprocess.check_output("speedtest-cli --simple", shell=True, timeout=60).decode('utf-8') except: return "Errore Speedtest" -# ============================================================================= -# SEZIONE 2: METEO INTELLIGENTE (MULTI-MODELLO MOSAICO) -# ============================================================================= - -def now_local() -> datetime.datetime: - return datetime.datetime.now(TZINFO) - -def parse_time(t: str) -> datetime.datetime: - dt = parser.isoparse(t) - if dt.tzinfo is None: return dt.replace(tzinfo=TZINFO) - return dt.astimezone(TZINFO) - -def degrees_to_cardinal(d: int) -> str: - dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] - return dirs[round(d / 45) % 8] - -def get_icon_set(prec, snow, code, is_day, cloud, vis, temp, rain, gust, cape): - sky = "☁️" - if code in (95, 96, 99): sky = "⛈️" if prec > 0 else "🌩️" - elif (code in (45, 48) or vis < 1000) and prec < 1: sky = "🌫️" - elif prec >= 0.1: sky = "🌨️" if snow > 0 else "🌧️" - elif cloud <= 20: sky = "☀️" if is_day else "🌙" - elif cloud <= 40: sky = "🌤️" if is_day else "🌙" - elif cloud <= 60: sky = "⛅️" - elif cloud <= 80: sky = "🌥️" - - sgx = "-" - if snow > 0 or (code in (71,73,75,77,85,86) if code else False): sgx = "☃️" - elif temp < 0 or (code in (66,67) if code else False): sgx = "🧊" - elif cape > 2000: sgx = "🌪️" - elif cape > 1000: sgx = "⚡" - elif temp > 35: sgx = "🥵" - elif rain > 4: sgx = "☔️" - elif gust > 50: sgx = "💨" - return sky, sgx - -def get_coordinates(city_name: str): - """ - Cerca le coordinate della città con fallback EN. - """ - # 1. Tentativo ITALIANO - params = {"name": city_name, "count": 10, "language": "it", "format": "json"} +def call_script_text(script_path, args_list): + """Wrapper per lanciare script che restituiscono testo (Meteo)""" try: - r = requests.get(GEOCODING_URL, params=params, headers=HTTP_HEADERS, timeout=10) - data = r.json() - if "results" in data and len(data["results"]) > 0: - res = data["results"][0] - cc = res.get("country_code", "IT").upper() - name = f"{res.get('name')} ({cc})" - return res["latitude"], res["longitude"], name, cc - except Exception as e: logger.error(f"Geocoding IT error: {e}") - - # 2. Tentativo FALLBACK INGLESE - try: - params["language"] = "en" - r = requests.get(GEOCODING_URL, params=params, headers=HTTP_HEADERS, timeout=10) - data = r.json() - if "results" in data and len(data["results"]) > 0: - res = data["results"][0] - cc = res.get("country_code", "IT").upper() - name = f"{res.get('name')} ({cc})" - return res["latitude"], res["longitude"], name, cc - except Exception as e: logger.error(f"Geocoding EN error: {e}") - - return None - -def choose_best_model(lat, lon, cc): - """ - Seleziona il modello migliore. - Fallback Generale: NOAA GFS (perché ha sempre Visibilità/Cape). - """ - - # 1. GIAPPONE -> JMA MSM - if cc == 'JP': - return "jma_msm", "JMA MSM (5km)" - - # 2. SCANDINAVIA -> Yr.no - if cc in ['NO', 'SE', 'FI', 'DK', 'IS']: - return "metno_nordic", "Yr.no (Nordic)" - - # 3. UK & IRLANDA -> UK Met Office - if cc in ['GB', 'IE']: - return "ukmo_global", "UK MetOffice" - - # 4. TUNING ITALIA (Mosaico) - if cc == 'IT' or cc == 'SM': - # ZONA 1: Nord-Ovest, Tirreno, Sardegna (Lon <= 13.0, Lat > 40.5) - if lon <= 13.0 and lat > 40.5: - return "meteofrance_arome_france_hd", "Arome HD" - # ZONA 2: Nord-Est, Adriatico Nord/Centro - if lat >= 43.0: - return "icon_d2", "ICON-D2 (2km)" - # ZONA 3: Sud Italia -> ICON-EU (Meglio di GFS per locale) - return "icon_eu", "ICON-EU (7km)" - - # 5. RESTO DEL MONDO (Europa Centrale ICON-D2) - if cc in ['DE', 'AT', 'CH', 'LI']: - return "icon_d2", "ICON-D2" - - # 6. RESTO DEL MONDO -> NOAA GFS - # Usiamo GFS invece di ECMWF perché ECMWF spesso manca di 'visibility'/'cape' - # facendo crashare o svuotare il report. GFS è completo. - return "gfs_global", "NOAA GFS Global" - -def get_forecast(lat, lon, model): - params = { - "latitude": lat, "longitude": lon, "timezone": TZ, - "forecast_days": 3, - "models": model, - "wind_speed_unit": "kmh", "precipitation_unit": "mm", - "hourly": "temperature_2m,relative_humidity_2m,cloudcover,windspeed_10m,winddirection_10m,windgusts_10m,precipitation,rain,snowfall,weathercode,is_day,cape,visibility" - } - try: - r = requests.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25) - r.raise_for_status() - return r.json() - except Exception as e: logger.error(f"Meteo API error: {e}"); return None - -def safe_get_list(hourly_data, key, length, default=None): - """Estrae una lista sicura, gestendo chiavi mancanti""" - if key in hourly_data and hourly_data[key] is not None: - return hourly_data[key] - return [default] * length - -def generate_weather_report(lat, lon, location_name, cc="IT") -> str: - model_id, model_name = choose_best_model(lat, lon, cc) - - data = get_forecast(lat, lon, model_id) - if not data: return f"❌ Errore API Meteo ({model_name})." - - hourly = data.get("hourly", {}) - times = hourly.get("time", []) - if not times: return "❌ Dati orari mancanti." - - L = len(times) - - try: - l_temp = safe_get_list(hourly, "temperature_2m", L, 0) - l_rh = safe_get_list(hourly, "relative_humidity_2m", L, 0) - l_cl = safe_get_list(hourly, "cloudcover", L, 0) - l_prec = safe_get_list(hourly, "precipitation", L, 0) - l_rain = safe_get_list(hourly, "rain", L, 0) - l_snow = safe_get_list(hourly, "snowfall", L, 0) - l_wspd = safe_get_list(hourly, "windspeed_10m", L, 0) - l_gust = safe_get_list(hourly, "windgusts_10m", L, 0) - l_wdir = safe_get_list(hourly, "winddirection_10m", L, 0) - l_code = safe_get_list(hourly, "weathercode", L, 0) - l_day = safe_get_list(hourly, "is_day", L, 1) - l_cape = safe_get_list(hourly, "cape", L, 0) - l_vis = safe_get_list(hourly, "visibility", L, 10000) - - except Exception as e: - return f"❌ Errore elaborazione dati meteo: {e}" - - now = now_local().replace(minute=0, second=0, microsecond=0) - blocks = [] - - for (label, hours_duration, step) in [("Prime 24h", 24, 1), ("Successive 24h", 24, 2)]: - end_time = now + datetime.timedelta(hours=hours_duration) - lines = [f"{'LT':<2} {'T°':>3} {'h%':>3} {'mm':>2} {'Vento':<5} {'Nv%':>3} {'Sky':<2} {'Sgx':<3}", "-" * 30] - count = 0 - for i, t_str in enumerate(times): - try: dt = parse_time(t_str) - except: continue - if dt < now or dt >= end_time: continue - if dt.hour % step != 0: continue - - try: - T = float(l_temp[i]) - Rh = int(l_rh[i] or 0) - Cl = int(l_cl[i] or 0) - Pr = float(l_prec[i] or 0) - Rn = float(l_rain[i] or 0) - Sn = float(l_snow[i] or 0) - Wspd = float(l_wspd[i] or 0) - Gust = float(l_gust[i] or 0) - Wdir = int(l_wdir[i] or 0) - Cape = float(l_cape[i] or 0) - Vis = float(l_vis[i] or 10000) - Code = int(l_code[i]) if l_code[i] is not None else None - IsDay = int(l_day[i] if l_day[i] is not None else 1) - - t_s = f"{int(round(T))}" - p_s = "--" if Pr < 0.2 else f"{int(round(Pr))}" - card = degrees_to_cardinal(Wdir) - w_val = Gust if (Gust - Wspd) > 15 else Wspd - w_txt = f"{card} {int(round(w_val))}" - if (Gust - Wspd) > 15: - g_txt = f"G{int(round(w_val))}" - if len(card)+len(g_txt) <= 5: w_txt = f"{card}{g_txt}" - elif len(card)+1+len(g_txt) <= 5: w_txt = f"{card} {g_txt}" - else: w_txt = g_txt - w_fmt = f"{w_txt:<5}" - sky, sgx = get_icon_set(Pr, Sn, Code, IsDay, Cl, Vis, T, Rn, Gust, Cape) - lines.append(f"{dt.strftime('%H'):<2} {t_s:>3} {Rh:>3} {p_s:>2} {w_fmt} {Cl:>3} {sky:<2} {sgx:<3}") - count += 1 - except: continue - if count > 0: - day_label = f"{['Lun','Mar','Mer','Gio','Ven','Sab','Dom'][now.weekday()]} {now.day}" - blocks.append(f"*{day_label} ({label})*\n```text\n" + "\n".join(lines) + "\n```") - now = end_time - - return f"🌤️ *METEO REPORT*\n📍 {location_name}\n🧠 Fonte: {model_name}\n\n" + "\n\n".join(blocks) + cmd = ["python3", script_path] + args_list + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + return result.stdout.strip() if result.returncode == 0 else f"⚠️ Errore Script:\n{result.stderr}" + except Exception as e: return f"❌ Errore esecuzione: {e}" # ============================================================================= -# SEZIONE 3: BOT HANDLERS & SCHEDULER +# SEZIONE 2: GESTORI COMANDI (HANDLERS) # ============================================================================= -# Decoratore Sicurezza Multi-Utente +# Decoratore Sicurezza def restricted(func): @wraps(func) async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs): user_id = update.effective_user.id - if user_id not in ALLOWED_IDS: - logger.warning(f"⚠️ ACCESSO NEGATO: User {user_id}") - return + if user_id not in ALLOWED_IDS: return return await func(update, context, *args, **kwargs) return wrapped @@ -345,53 +140,161 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: keyboard = [ [InlineKeyboardButton("🖥️ Core Server", callback_data="menu_core"), InlineKeyboardButton("🔍 Scan LAN", callback_data="menu_lan")], [InlineKeyboardButton("🛡️ Pi-hole", callback_data="menu_pihole"), InlineKeyboardButton("🌐 Rete", callback_data="menu_net")], - [InlineKeyboardButton("🌤️ Meteo Casa", callback_data="req_meteo_home"), InlineKeyboardButton("📜 Logs", callback_data="menu_logs")] + [InlineKeyboardButton("🌤️ Meteo Casa", callback_data="req_meteo_home"), InlineKeyboardButton("📹 Camere", callback_data="menu_cams")], + [InlineKeyboardButton("📜 Logs", callback_data="menu_logs")] ] - text = "🎛 **Loogle Control Center v7.9**\nComandi disponibili:\n🔹 `/meteo `\n🔹 Pulsanti sotto" + text = "🎛 **Loogle Control Center v9.0**\n\n🔹 `/meteo `\n🔹 `/meteo7 ` (7 Giorni)\n🔹 `/cam ` (Snapshot)" if update.message: await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown") else: await update.callback_query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown") @restricted async def meteo_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + query = " ".join(context.args).strip() + if not query or query.lower() == "casa": + await update.message.reply_text("⏳ **Scarico Meteo Casa...**", parse_mode="Markdown") + report = call_script_text(METEO_SCRIPT, ["--home"]) + else: + await update.message.reply_text(f"🔄 Cerco '{query}'...", parse_mode="Markdown") + report = call_script_text(METEO_SCRIPT, ["--query", query]) + + await update.message.reply_text(report, parse_mode="Markdown") + +@restricted +async def meteo7_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + chat_id = update.effective_chat.id + query = "casa" + if context.args: query = " ".join(context.args) + await update.message.reply_text(f"📡 Calcolo previsione 7gg per: {query}...", parse_mode="Markdown") + subprocess.Popen(["python3", METEO7_SCRIPT, query, "--chat_id", str(chat_id)]) + +@restricted +async def cam_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: if not context.args: - await update.message.reply_text("⚠️ Usa: `/meteo ` (es. `/meteo Rimini`)", parse_mode="Markdown") + # Se non c'è argomento, mostra il menu camere + keyboard = [ + [InlineKeyboardButton("📷 Sala", callback_data="req_cam_sala"), InlineKeyboardButton("📷 Ingresso", callback_data="req_cam_ingresso")], + [InlineKeyboardButton("📷 Taverna", callback_data="req_cam_taverna"), InlineKeyboardButton("📷 Retro", callback_data="req_cam_retro")], + [InlineKeyboardButton("📷 Matrim.", callback_data="req_cam_matrimoniale"), InlineKeyboardButton("📷 Luca", callback_data="req_cam_luca")], + [InlineKeyboardButton("⬅️ Indietro", callback_data="main_menu")] + ] + await update.message.reply_text("📹 **Scegli una telecamera:**", reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown") return - city = " ".join(context.args) - await update.message.reply_text(f"🔄 Cerco '{city}'...", parse_mode="Markdown") + cam_name = context.args[0] + await update.message.reply_chat_action(action="upload_photo") - coords = get_coordinates(city) - if coords: - lat, lon, name, cc = coords - report = generate_weather_report(lat, lon, name, cc) - await update.message.reply_text(report, parse_mode="Markdown") - else: - await update.message.reply_text(f"❌ Città '{city}' non trovata.", parse_mode="Markdown") + try: + # Timeout 15s per RTSP + result = subprocess.run(["python3", CAM_SCRIPT, cam_name], capture_output=True, text=True, timeout=15) + output = result.stdout.strip() + + if output.startswith("OK:"): + img_path = output.split(":", 1)[1] + await update.message.reply_photo(photo=open(img_path, 'rb'), caption=f"📷 **{cam_name.capitalize()}**") + elif output.startswith("ERR:"): + await update.message.reply_text(output.split(":", 1)[1]) + else: + await update.message.reply_text(f"❌ Risposta imprevista dallo script: {output}") + + except Exception as e: + await update.message.reply_text(f"❌ Errore critico: {e}") async def scheduled_morning_report(context: ContextTypes.DEFAULT_TYPE) -> None: - logger.info("⏰ Invio report automatico meteo...") - # Forza "SM" per casa -> Arome/IconD2 in base alla posizione - report = generate_weather_report(HOME_LAT, HOME_LON, HOME_NAME, "SM") + # Meteo automatico alle 8:00 + report = call_script_text(METEO_SCRIPT, ["--home"]) for uid in ALLOWED_IDS: try: await context.bot.send_message(chat_id=uid, text=report, parse_mode="Markdown") except: pass +@restricted +async def clip_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if not context.args: + await update.message.reply_text("⚠️ Usa: `/clip ` (es. /clip sala)", parse_mode="Markdown") + return + + cam_name = context.args[0] + await update.message.reply_chat_action(action="upload_video") # Icona "sta inviando video..." + await update.message.reply_text(f"🎥 **Registro 10s da {cam_name}...**", parse_mode="Markdown") + + try: + # Lancia lo script con flag --video + result = subprocess.run(["python3", CAM_SCRIPT, cam_name, "--video"], capture_output=True, text=True, timeout=20) + output = result.stdout.strip() + + if output.startswith("OK:"): + vid_path = output.split(":", 1)[1] + await update.message.reply_video(video=open(vid_path, 'rb'), caption=f"🎥 **Clip: {cam_name.capitalize()}**") + elif output.startswith("ERR:"): + await update.message.reply_text(output.split(":", 1)[1]) + + except Exception as e: + await update.message.reply_text(f"❌ Errore critico: {e}") + @restricted async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: query = update.callback_query - await query.answer() + await query.answer() # Risposta immediata per togliere il loading dal pulsante data = query.data - if data == "main_menu": await start(update, context) - + # --- NAVIGAZIONE MENU --- + if data == "main_menu": + await start(update, context) + + # --- SEZIONE METEO --- elif data == "req_meteo_home": await query.edit_message_text("⏳ **Scaricamento Meteo Casa...**", parse_mode="Markdown") - # Forza "SM" per casa - report = generate_weather_report(HOME_LAT, HOME_LON, HOME_NAME, "SM") + report = call_script_text(METEO_SCRIPT, ["--home"]) keyboard = [[InlineKeyboardButton("⬅️ Indietro", callback_data="main_menu")]] await query.edit_message_text(report, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown") + # --- SEZIONE CAMERE --- + elif data == "menu_cams": + keyboard = [ + [InlineKeyboardButton("📷 Sala", callback_data="req_cam_sala"), InlineKeyboardButton("📷 Ingresso", callback_data="req_cam_ingresso")], + [InlineKeyboardButton("📷 Taverna", callback_data="req_cam_taverna"), InlineKeyboardButton("📷 Retro", callback_data="req_cam_retro")], + [InlineKeyboardButton("📷 Matrim.", callback_data="req_cam_matrimoniale"), InlineKeyboardButton("📷 Luca", callback_data="req_cam_luca")], + [InlineKeyboardButton("⬅️ Indietro", callback_data="main_menu")] + ] + await query.edit_message_text("📹 **Centrale Video**\nSeleziona una telecamera:", reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown") + + elif data.startswith("req_cam_"): + cam_name = data.replace("req_cam_", "") + # Non editiamo il messaggio, inviamo una nuova foto sotto + try: + result = subprocess.run(["python3", CAM_SCRIPT, cam_name], capture_output=True, text=True, timeout=15) + output = result.stdout.strip() + + if output.startswith("OK:"): + img_path = output.split(":", 1)[1] + await context.bot.send_photo(chat_id=update.effective_chat.id, photo=open(img_path, 'rb'), caption=f"📷 **{cam_name.capitalize()}**") + elif output.startswith("ERR:"): + await context.bot.send_message(chat_id=update.effective_chat.id, text=output.split(":", 1)[1]) + except Exception as e: + await context.bot.send_message(chat_id=update.effective_chat.id, text=f"❌ Errore richiesta cam: {e}") + + elif data.startswith("req_vid_"): + cam_name = data.replace("req_vid_", "") + await query.answer("🎥 Registrazione in corso (10s)...") + # Inviamo un messaggio di attesa perché ci mette un po' + msg = await context.bot.send_message(chat_id=update.effective_chat.id, text=f"⏳ Registro clip: {cam_name}...") + + try: + result = subprocess.run(["python3", CAM_SCRIPT, cam_name, "--video"], capture_output=True, text=True, timeout=20) + output = result.stdout.strip() + + # Cancelliamo il messaggio di attesa + await context.bot.delete_message(chat_id=update.effective_chat.id, message_id=msg.message_id) + + if output.startswith("OK:"): + vid_path = output.split(":", 1)[1] + await context.bot.send_video(chat_id=update.effective_chat.id, video=open(vid_path, 'rb'), caption=f"🎥 **Clip: {cam_name.capitalize()}**") + elif output.startswith("ERR:"): + await context.bot.send_message(chat_id=update.effective_chat.id, text=output.split(":", 1)[1]) + except Exception as e: + await context.bot.send_message(chat_id=update.effective_chat.id, text=f"❌ Errore: {e}") + + # --- SEZIONE SISTEMA CORE --- elif data == "menu_core": keyboard = [] for i, dev in enumerate(CORE_DEVICES): keyboard.append([InlineKeyboardButton(dev['name'], callback_data=f"stat_{i}")]) @@ -410,6 +313,7 @@ async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> await query.edit_message_text(f"⏳ Controllo {dev['name']}...", parse_mode="Markdown") await query.edit_message_text(f"🔹 **{dev['name']}**\n\n{get_device_stats(dev)}", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Indietro", callback_data="menu_core")]]), parse_mode="Markdown") + # --- SEZIONE LAN --- elif data == "menu_lan": await query.edit_message_text("⏳ **Scansione LAN...**", parse_mode="Markdown") report = "🔍 **DIAGNOSTICA LAN**\n\n" @@ -433,6 +337,7 @@ async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> res = run_cmd("reboot", dev['ip'], "admin") await query.edit_message_text(f"⚡ Inviato a {dev['name']}...\n\n`{res}`", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Indietro", callback_data="menu_reboot")]]), parse_mode="Markdown") + # --- SEZIONE PI-HOLE --- elif data == "menu_pihole": status_raw = run_cmd("sudo pihole status", MASTER_IP, SSH_USER) icon = "✅" if "Enabled" in status_raw or "enabled" in status_raw else "🔴" @@ -446,16 +351,18 @@ async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> elif "restart" in data: run_cmd("sudo systemctl restart pihole-FTL", MASTER_IP, SSH_USER) await query.edit_message_text("✅ Comando inviato.", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Indietro", callback_data="menu_pihole")]]), parse_mode="Markdown") + # --- SEZIONE RETE --- elif data == "menu_net": ip = run_cmd("curl -s ifconfig.me") keyboard = [[InlineKeyboardButton("🚀 Speedtest", callback_data="net_speedtest")], [InlineKeyboardButton("⬅️ Indietro", callback_data="main_menu")]] await query.edit_message_text(f"🌐 **Rete**\n🌍 IP: `{ip}`", reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown") elif data == "net_speedtest": - await query.edit_message_text("🚀 **Speedtest...**", parse_mode="Markdown") + await query.edit_message_text("🚀 **Speedtest... (attendi 40s)**", parse_mode="Markdown") res = run_speedtest() await query.edit_message_text(f"🚀 **Risultato:**\n\n`{res}`", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Indietro", callback_data="menu_net")]]), parse_mode="Markdown") + # --- SEZIONE LOGS --- elif data == "menu_logs": keyboard = [[InlineKeyboardButton("🐶 Watchdog", callback_data="log_wd"), InlineKeyboardButton("💾 Backup", callback_data="log_bk")], [InlineKeyboardButton("🔄 NPM Sync", callback_data="log_npm"), InlineKeyboardButton("⬅️ Indietro", callback_data="main_menu")]] await query.edit_message_text("📜 **Logs**", reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="Markdown") @@ -466,19 +373,24 @@ async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> await query.edit_message_text(f"📜 **Log:**\n\n`{log_c}`", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅️ Indietro", callback_data="menu_logs")]]), parse_mode="Markdown") def main(): - logger.info("Avvio Loogle Bot v7.9 (Global GFS Fix)...") + logger.info("Avvio Loogle Bot v9.0 (Modular)...") application = Application.builder().token(BOT_TOKEN).build() - - # Handlers + + # Registrazione Comandi application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("meteo", meteo_command)) + application.add_handler(CommandHandler("meteo7", meteo7_command)) + application.add_handler(CommandHandler("cam", cam_command)) + application.add_handler(CommandHandler("clip", clip_command)) + + # Registrazione Callback Menu application.add_handler(CallbackQueryHandler(button_handler)) - # SCHEDULER + # Scheduler job_queue = application.job_queue job_queue.run_daily(scheduled_morning_report, time=datetime.time(hour=8, minute=0, tzinfo=TZINFO), days=(0, 1, 2, 3, 4, 5, 6)) application.run_polling() if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/services/telegram-bot/cam.py b/services/telegram-bot/cam.py new file mode 100755 index 0000000..2025ca4 --- /dev/null +++ b/services/telegram-bot/cam.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +import argparse +import subprocess +import os +import sys + +# --- CONFIGURAZIONE HIKVISION --- +RTSP_USER = "admin" +RTSP_PASS = "@Dedelove1" +RTSP_PORT = "554" +RTSP_PATH = "/Streaming/channels/101" + +def get_url(ip): + return f"rtsp://{RTSP_USER}:{RTSP_PASS}@{ip}:{RTSP_PORT}{RTSP_PATH}" + +CAM_CONFIG = { + "matrimoniale": get_url("192.168.135.2"), + "luca": get_url("192.168.135.3"), + "ingresso": get_url("192.168.135.4"), + "sala": get_url("192.168.135.5"), + "taverna": get_url("192.168.135.6"), + "retro": get_url("192.168.135.7"), +} + +OUTPUT_PHOTO = "/tmp/cam_snapshot.jpg" +OUTPUT_VIDEO = "/tmp/cam_video.mp4" + +def get_cam_key(cam_name): + cam_key = cam_name.lower().strip() + if cam_key.startswith("cam "): cam_key = cam_key.replace("cam ", "") + + if cam_key in CAM_CONFIG: return cam_key + + for key in CAM_CONFIG: + if key in cam_key: return key + return None + +def get_media(cam_name, is_video=False): + key = get_cam_key(cam_name) + if not key: + return None, f"❌ Camera '{cam_name}' non trovata.\nDisponibili: {', '.join(CAM_CONFIG.keys())}" + + rtsp_url = CAM_CONFIG[key] + output_file = OUTPUT_VIDEO if is_video else OUTPUT_PHOTO + + if is_video: + # COMANDO VIDEO (10 SECONDI) + # -t 10: Durata 10 secondi + # -c:v copy: COPIA il flusso video senza ricodificarlo (Zero CPU, Istantaneo) + # Se Telegram non legge il video, cambia "-c:v copy" in "-c:v libx264 -preset ultrafast" + cmd = [ + "ffmpeg", "-y", + "-rtsp_transport", "tcp", + "-i", rtsp_url, + "-t", "10", # Durata clip + "-c:v", "libx264", # Ricodifica leggera per compatibilità Telegram garantita + "-preset", "ultrafast", # Velocissimo per non caricare la CPU + "-an", # Rimuovi Audio (togli questa riga se vuoi l'audio) + "-f", "mp4", + output_file + ] + else: + # COMANDO FOTO + cmd = [ + "ffmpeg", "-y", + "-rtsp_transport", "tcp", + "-i", rtsp_url, + "-frames:v", "1", + "-q:v", "2", + output_file + ] + + try: + subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=15, check=True) + + if os.path.exists(output_file) and os.path.getsize(output_file) > 0: + return output_file, None + else: + return None, "❌ Errore: File output vuoto." + + except subprocess.TimeoutExpired: + return None, "⏰ Timeout: La cam non risponde." + except Exception as e: + return None, f"❌ Errore: {e}" + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("name", help="Nome della camera") + parser.add_argument("--video", action="store_true", help="Registra video clip invece di foto") + args = parser.parse_args() + + path, error = get_media(args.name, args.video) + + if path: + print(f"OK:{path}") + else: + print(f"ERR:{error}") \ No newline at end of file diff --git a/services/telegram-bot/meteo.py b/services/telegram-bot/meteo.py new file mode 100644 index 0000000..cd0703c --- /dev/null +++ b/services/telegram-bot/meteo.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 +import requests +import datetime +import argparse +import sys +import logging +from zoneinfo import ZoneInfo +from dateutil import parser as date_parser # pyright: ignore[reportMissingModuleSource] + +# Setup logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# --- CONFIGURAZIONE METEO --- +HOME_LAT = 43.9356 +HOME_LON = 12.4296 +HOME_NAME = "🏠 Casa (Wide View ±12km)" +TZ = "Europe/Rome" +TZINFO = ZoneInfo(TZ) + +# Offset ~12-15km +OFFSET_LAT = 0.12 +OFFSET_LON = 0.16 + +OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast" +GEOCODING_URL = "https://geocoding-api.open-meteo.com/v1/search" +HTTP_HEADERS = {"User-Agent": "loogle-bot-v10.4"} + +def now_local() -> datetime.datetime: + return datetime.datetime.now(TZINFO) + +def parse_time(t: str) -> datetime.datetime: + try: + dt = date_parser.isoparse(t) + if dt.tzinfo is None: return dt.replace(tzinfo=TZINFO) + return dt.astimezone(TZINFO) + except Exception as e: + logger.error(f"Time parse error: {e}") + return now_local() + +def degrees_to_cardinal(d: int) -> str: + dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] + try: + return dirs[round(d / 45) % 8] + except: return "N" + +def get_icon_set(prec, snow, code, is_day, cloud, vis, temp, rain, gust, cape, cloud_type): + sky = "☁️" + try: + if cloud_type == 'F': + sky = "🌫️" + elif code in (95, 96, 99): sky = "⛈️" if prec > 0 else "🌩️" + elif prec >= 0.1: sky = "🌨️" if snow > 0 else "🌧️" + else: + # LOGICA PERCEZIONE UMANA + if cloud_type == 'H': + if cloud <= 40: sky = "☀️" if is_day else "🌙" + elif cloud <= 80: sky = "🌤️" if is_day else "🌙" + else: sky = "🌥️" + else: + if cloud <= 15: sky = "☀️" if is_day else "🌙" + elif cloud <= 35: sky = "🌤️" if is_day else "🌙" + elif cloud <= 60: sky = "⛅️" + elif cloud <= 85: sky = "🌥️" + else: sky = "☁️" + + sgx = "-" + if snow > 0 or (code is not None and code in (71,73,75,77,85,86)): sgx = "☃️" + elif temp < 0 or (code is not None and code in (66,67)): sgx = "🧊" + elif cape > 2000: sgx = "🌪️" + elif cape > 1000: sgx = "⚡" + elif temp > 35: sgx = "🥵" + elif rain > 4: sgx = "☔️" + elif gust > 50: sgx = "💨" + + return sky, sgx + except Exception as e: + logger.error(f"Icon error: {e}") + return "❓", "-" + +def get_coordinates(city_name: str): + params = {"name": city_name, "count": 1, "language": "it", "format": "json"} + try: + r = requests.get(GEOCODING_URL, params=params, headers=HTTP_HEADERS, timeout=10) + data = r.json() + if "results" in data and data["results"]: + res = data["results"][0] + cc = res.get("country_code", "IT").upper() + name = f"{res.get('name')} ({cc})" + return res["latitude"], res["longitude"], name, cc + except Exception as e: + logger.error(f"Geocoding error: {e}") + return None + +def choose_best_model(lat, lon, cc): + if cc == 'JP': return "jma_msm", "JMA MSM" + if cc in ['NO', 'SE', 'FI', 'DK', 'IS']: return "metno_nordic", "Yr.no" + if cc in ['GB', 'IE']: return "ukmo_global", "UK MetOffice" + if cc == 'IT' or cc == 'SM': return "meteofrance_arome_france_hd", "AROME HD" + if cc in ['DE', 'AT', 'CH', 'LI', 'FR']: return "icon_d2", "ICON-D2" + return "gfs_global", "NOAA GFS" + +def get_forecast(lat, lon, model): + lats = [lat, lat + OFFSET_LAT, lat - OFFSET_LAT, lat, lat] + lons = [lon, lon, lon, lon + OFFSET_LON, lon - OFFSET_LON] + + lat_str = ",".join(map(str, lats)) + lon_str = ",".join(map(str, lons)) + + params = { + "latitude": lat_str, "longitude": lon_str, "timezone": TZ, + "forecast_days": 3, + "models": model, + "wind_speed_unit": "kmh", "precipitation_unit": "mm", + "hourly": "temperature_2m,apparent_temperature,relative_humidity_2m,cloud_cover,cloud_cover_low,cloud_cover_mid,cloud_cover_high,windspeed_10m,winddirection_10m,windgusts_10m,precipitation,rain,snowfall,weathercode,is_day,cape,visibility,uv_index" + } + try: + r = requests.get(OPEN_METEO_URL, params=params, headers=HTTP_HEADERS, timeout=25) + if r.status_code != 200: + logger.error(f"API Error {r.status_code}: {r.text}") + return None + return r.json() + except Exception as e: + logger.error(f"Request error: {e}") + return None + +def safe_get_list(hourly_data, key, length, default=None): + if key in hourly_data and hourly_data[key] is not None: + return hourly_data[key] + return [default] * length + +def get_val(val, default=0.0): + if val is None: return default + return float(val) + +def generate_weather_report(lat, lon, location_name, debug_mode=False, cc="IT") -> str: + model_id, model_name = choose_best_model(lat, lon, cc) + + data_list = get_forecast(lat, lon, model_id) + if not data_list: return f"❌ Errore API Meteo ({model_name})." + if not isinstance(data_list, list): data_list = [data_list] + + data_center = data_list[0] + hourly_c = data_center.get("hourly", {}) + times = hourly_c.get("time", []) + if not times: return "❌ Dati orari mancanti." + + L = len(times) + + # --- DATI LOCALI (CASA) --- + l_temp = safe_get_list(hourly_c, "temperature_2m", L, 0) + l_app = safe_get_list(hourly_c, "apparent_temperature", L, 0) + l_rh = safe_get_list(hourly_c, "relative_humidity_2m", L, 50) + l_prec = safe_get_list(hourly_c, "precipitation", L, 0) + l_rain = safe_get_list(hourly_c, "rain", L, 0) + l_snow = safe_get_list(hourly_c, "snowfall", L, 0) + l_wspd = safe_get_list(hourly_c, "windspeed_10m", L, 0) + l_gust = safe_get_list(hourly_c, "windgusts_10m", L, 0) + l_wdir = safe_get_list(hourly_c, "winddirection_10m", L, 0) + l_code = safe_get_list(hourly_c, "weathercode", L, 0) + l_day = safe_get_list(hourly_c, "is_day", L, 1) + l_cape = safe_get_list(hourly_c, "cape", L, 0) + l_vis = safe_get_list(hourly_c, "visibility", L, 10000) + l_uv = safe_get_list(hourly_c, "uv_index", L, 0) + + # Estraggo anche i dati nuvole LOCALI per il tipo + l_cl_low_loc = safe_get_list(hourly_c, "cloud_cover_low", L, 0) + l_cl_mid_loc = safe_get_list(hourly_c, "cloud_cover_mid", L, 0) + l_cl_hig_loc = safe_get_list(hourly_c, "cloud_cover_high", L, 0) + + # --- DATI GLOBALI (MEDIA) --- + acc_cl_tot = [0.0] * L + points_cl_tot = [ [] for _ in range(L) ] + p_names = ["Casa", "Nord", "Sud", "Est", "Ovest"] + + for d in data_list: + h = d.get("hourly", {}) + for i in range(L): + cc = get_val(safe_get_list(h, "cloud_cover", L)[i]) + cl = get_val(safe_get_list(h, "cloud_cover_low", L)[i]) + cm = get_val(safe_get_list(h, "cloud_cover_mid", L)[i]) + ch = get_val(safe_get_list(h, "cloud_cover_high", L)[i]) + + real_point_total = max(cc, cl, cm, ch) + + acc_cl_tot[i] += real_point_total + points_cl_tot[i].append(real_point_total) + + num_points = len(data_list) + avg_cl_tot = [x / num_points for x in acc_cl_tot] + + if debug_mode: + output = f"🔍 **DEBUG 5 PUNTI (V10.4)**\n" + now_h = now_local().replace(minute=0, second=0, microsecond=0) + idx = 0 + for i, t_str in enumerate(times): + if parse_time(t_str) >= now_h: + idx = i + break + + # Valori Locali + loc_L = get_val(l_cl_low_loc[idx]) + loc_H = get_val(l_cl_hig_loc[idx]) + + output += f"Ora: {parse_time(times[idx]).strftime('%H:%M')}\n" + output += f"📍 **LOCALE (Casa)**: L:{int(loc_L)}% | M:{int(get_val(l_cl_mid_loc[idx]))}% | H:{int(loc_H)}%\n" + output += f"🌍 **MEDIA GLOBALE**: {int(avg_cl_tot[idx])}%\n" + + decision = "H" + if loc_L > 40: decision = "L (Priorità Locale)" + output += f"👉 **Decisione**: {decision}\n" + return output + + now = now_local().replace(minute=0, second=0, microsecond=0) + blocks = [] + header = f"{'LT':<2} {'T°':>4} {'h%':>3} {'mm':>3} {'Vento':<5} {'Nv%':>5} {'Sk':<2} {'Sx':<2}" + separator = "-" * 31 + + for (label, hours_duration, step) in [("Prime 24h", 24, 1), ("Successive 24h", 24, 2)]: + end_time = now + datetime.timedelta(hours=hours_duration) + lines = [header, separator] + count = 0 + + for i, t_str in enumerate(times): + try: + dt = parse_time(t_str) + if dt < now or dt >= end_time: continue + if dt.hour % step != 0: continue + + T = get_val(l_temp[i], 0) + App = get_val(l_app[i], 0) + Rh = int(get_val(l_rh[i], 50)) + + t_suffix = "" + diff = App - T + if diff <= -2.5: t_suffix = "W" + elif diff >= 2.5: t_suffix = "H" + t_s = f"{int(round(T))}{t_suffix}" + + Pr = get_val(l_prec[i], 0) + Sn = get_val(l_snow[i], 0) + Code = int(l_code[i]) if l_code[i] is not None else 0 + + p_suffix = "" + if Code in [96, 99]: p_suffix = "G" + elif Code in [66, 67]: p_suffix = "Z" + elif Sn > 0 or Code in [71, 73, 75, 77, 85, 86]: p_suffix = "N" + p_s = "--" if Pr < 0.2 else f"{int(round(Pr))}{p_suffix}" + + # --- CLOUD LOGIC (V10.4: LOCAL PRIORITY) --- + + # Usiamo la MEDIA per la quantità (Panoramica) + c_avg_tot = int(avg_cl_tot[i]) + + # Usiamo i dati LOCALI per il tipo (Cosa ho sulla testa) + loc_L = get_val(l_cl_low_loc[i]) + loc_M = get_val(l_cl_mid_loc[i]) + loc_H = get_val(l_cl_hig_loc[i]) + Vis = get_val(l_vis[i], 10000) + + # Step 1: Default matematico LOCALE + types = {'L': loc_L, 'M': loc_M, 'H': loc_H} + dominant_type = max(types, key=types.get) + + # Quantità da mostrare: Media Globale + Cl = c_avg_tot + + # Step 2: Override Tattico LOCALE + # Se LOCALMENTE le basse sono > 40%, vincono loro. + # (Soglia abbassata a 40 per catturare il 51%) + if loc_L > 40: + dominant_type = 'L' + # Se localmente è nuvoloso basso, forziamo la copertura visiva alta + # anche se la media globale è più bassa + if Cl < loc_L: Cl = int(loc_L) + + # Step 3: Nebbia (F) + is_fog = False + if Vis < 2000 or Code in [45, 48]: + is_fog = True + elif Rh >= 96 and loc_L > 40: + is_fog = True + + if is_fog: + dominant_type = 'F' + if Cl < 100: Cl = 100 + + # Check varianza spaziale + min_p = min(points_cl_tot[i]) + max_p = max(points_cl_tot[i]) + var_symbol = "" + if (max_p - min_p) > 20: + var_symbol = "~" + + cl_str = f"{var_symbol}{Cl}{dominant_type}" + + UV = get_val(l_uv[i], 0) + uv_suffix = "" + if UV >= 10: uv_suffix = "E" + elif UV >= 7: uv_suffix = "H" + + Wspd = get_val(l_wspd[i], 0) + Gust = get_val(l_gust[i], 0) + Wdir = int(get_val(l_wdir[i], 0)) + Cape = get_val(l_cape[i], 0) + IsDay = int(l_day[i]) if l_day[i] is not None else 1 + + card = degrees_to_cardinal(Wdir) + w_val = Gust if (Gust - Wspd) > 15 else Wspd + w_txt = f"{card} {int(round(w_val))}" + if (Gust - Wspd) > 15: + g_txt = f"G{int(round(w_val))}" + if len(card)+len(g_txt) <= 5: w_txt = f"{card}{g_txt}" + elif len(card)+1+len(g_txt) <= 5: w_txt = f"{card} {g_txt}" + else: w_txt = g_txt + w_fmt = f"{w_txt:<5}" + + sky, sgx = get_icon_set(Pr, Sn, Code, IsDay, Cl, Vis, T, get_val(l_rain[i], 0), Gust, Cape, dominant_type) + sky_fmt = f"{sky}{uv_suffix}" + + lines.append(f"{dt.strftime('%H'):<2} {t_s:>4} {Rh:>3} {p_s:>3} {w_fmt} {cl_str:>5} {sky_fmt:<2} {sgx:<2}") + count += 1 + + except Exception as e: + logger.error(f"Errore riga meteo {i}: {e}") + continue + + if count > 0: + day_label = f"{['Lun','Mar','Mer','Gio','Ven','Sab','Dom'][now.weekday()]} {now.day}" + blocks.append(f"*{day_label} ({label})*\n```text\n" + "\n".join(lines) + "\n```") + now = end_time + + return f"🌤️ *METEO REPORT*\n📍 {location_name}\n🧠 Fonte: {model_name}\n\n" + "\n\n".join(blocks) + +if __name__ == "__main__": + args_parser = argparse.ArgumentParser() + args_parser.add_argument("--query", help="Nome città") + args_parser.add_argument("--home", action="store_true", help="Usa coordinate casa") + args_parser.add_argument("--debug", action="store_true", help="Mostra i valori dei 5 punti") + args = args_parser.parse_args() + + if args.home: + print(generate_weather_report(HOME_LAT, HOME_LON, HOME_NAME, args.debug, "SM")) + elif args.query: + coords = get_coordinates(args.query) + if coords: + lat, lon, name, cc = coords + print(generate_weather_report(lat, lon, name, args.debug, cc)) + else: + print(f"❌ Città '{args.query}' non trovata.") + else: + print("Uso: meteo.py --query 'Nome Città' oppure --home [--debug]") \ No newline at end of file diff --git a/services/telegram-bot/previsione7.py b/services/telegram-bot/previsione7.py new file mode 100755 index 0000000..a9eb45f --- /dev/null +++ b/services/telegram-bot/previsione7.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 +import requests +import argparse +import datetime +import os +import sys +from zoneinfo import ZoneInfo +from collections import defaultdict + +# --- CONFIGURAZIONE DEFAULT --- +DEFAULT_LAT = 43.9356 +DEFAULT_LON = 12.4296 +DEFAULT_NAME = "🏠 Casa (Strada Cà Toro)" + +# --- TIMEZONE --- +TZ_STR = "Europe/Rome" + +# --- TELEGRAM CONFIG --- +ADMIN_CHAT_ID = "64463169" +TOKEN_FILE_HOME = os.path.expanduser("~/.telegram_dpc_bot_token") +TOKEN_FILE_ETC = "/etc/telegram_dpc_bot_token" + +# --- SOGLIE --- +SOGLIA_VENTO_KMH = 40.0 +MIN_MM_PER_EVENTO = 0.1 + +def get_bot_token(): + for path in [TOKEN_FILE_ETC, TOKEN_FILE_HOME]: + if os.path.exists(path): + try: + with open(path, 'r') as f: return f.read().strip() + except: pass + sys.exit(1) + +def get_coordinates(query): + if not query or query.lower() == "casa": + return DEFAULT_LAT, DEFAULT_LON, DEFAULT_NAME + url = "https://geocoding-api.open-meteo.com/v1/search" + try: + resp = requests.get(url, params={"name": query, "count": 1, "language": "it", "format": "json"}, timeout=5) + res = resp.json().get("results", [])[0] + return res['latitude'], res['longitude'], f"{res.get('name')} ({res.get('country_code','')})" + except: return None, None, None + +def get_weather(lat, lon): + url = "https://api.open-meteo.com/v1/forecast" + params = { + "latitude": lat, "longitude": lon, + "hourly": "temperature_2m,precipitation_probability,precipitation,weathercode,windspeed_10m,dewpoint_2m", + "daily": "temperature_2m_max,temperature_2m_min,sunrise,sunset", + "timezone": TZ_STR, "models": "best_match", "forecast_days": 8 + } + try: + resp = requests.get(url, params=params, timeout=10) + resp.raise_for_status() + return resp.json() + except: return None + +def get_precip_type(code): + """Definisce il tipo di precipitazione in base al codice WMO.""" + # Neve (71-77, 85-86) + if (71 <= code <= 77) or code in [85, 86]: return "❄️ Neve" + # Grandine (96-99) + if code in [96, 99]: return "⚡🌨 Grandine" + # Pioggia congelantesi (66-67) + if code in [66, 67]: return "🧊☔ Pioggia Congelantesi" + # Pioggia standard + return "☔ Pioggia" + +def get_intensity_label(mm_h): + if mm_h < 2.5: return "Debole" + if mm_h < 7.6: return "Moderata" + return "Forte ⚠️" + +def analyze_daily_events(times, codes, probs, precip, winds, temps, dewpoints): + """Scansiona le 24 ore e trova blocchi di eventi continui.""" + events = [] + + # ========================================== + # 1. LIVELLO PERICOLI (Ghiaccio, Gelo, Brina) + # ========================================== + in_ice = False + start_ice = 0 + ice_type = "" + + for i in range(len(times)): + t = temps[i] + d = dewpoints[i] + p = precip[i] + c = codes[i] + + current_ice_condition = None + + # A. GELICIDIO (Pericolo massimo) + # Se il codice è esplicitamente Gelicidio (66,67) OPPURE piove (codici pioggia) con T < 0 + is_raining_code = (50 <= c <= 69) or (80 <= c <= 82) + if c in [66, 67] or (p > 0 and t <= 0 and is_raining_code): + current_ice_condition = "🧊☠️ GELICIDIO" + + # B. GHIACCIO/BRINA (Strada Scivolosa) + # Niente precipitazioni, T bassa (<2°C) e DewPoint vicinissimo alla T (<1°C diff) + elif p == 0 and t <= 2.0 and (t - d) < 1.0: + current_ice_condition = "⛸️⚠️ GHIACCIO/BRINA (Strada Scivolosa)" + + # C. GELATA SEMPLICE (T < 0) + elif t < 0: + current_ice_condition = "❄️ Gelata notturna" + + # Logica raggruppamento + if current_ice_condition and not in_ice: + in_ice = True + start_ice = i + ice_type = current_ice_condition + elif (not current_ice_condition and in_ice) or (in_ice and current_ice_condition != ice_type) or (in_ice and i == len(times)-1): + end_idx = i if not current_ice_condition else i + if end_idx > start_ice: + start_time = times[start_ice].split("T")[1][:5] + end_time = times[end_idx].split("T")[1][:5] + min_t_block = min(temps[start_ice:end_idx+1]) + events.append(f"{ice_type}: {start_time}-{end_time} (Min: {min_t_block}°C)") + + in_ice = False + if current_ice_condition: + in_ice = True + start_ice = i + ice_type = current_ice_condition + + # ========================================== + # 2. LIVELLO PRECIPITAZIONI (Pioggia, Neve) + # ========================================== + # Nota: Non sopprimiamo più nulla. Se nevica mentre gela, li segnaliamo entrambi. + in_rain = False + start_idx = 0 + current_rain_type = "" + + for i in range(len(times)): + is_raining = precip[i] >= MIN_MM_PER_EVENTO + + if is_raining and not in_rain: + in_rain = True + start_idx = i + current_rain_type = get_precip_type(codes[i]) + + # Cambio tipo precipitazione (es. da Pioggia a Neve nello stesso blocco) + elif in_rain and is_raining and get_precip_type(codes[i]) != current_rain_type: + # Chiudiamo il blocco precedente e ne apriamo uno nuovo + end_idx = i + block_precip = precip[start_idx:end_idx] + tot_mm = sum(block_precip) + max_prob = max(probs[start_idx:end_idx]) + start_time = times[start_idx].split("T")[1][:5] + end_time = times[end_idx].split("T")[1][:5] # Qui combacia + avg_intensity = tot_mm / len(block_precip) + + events.append( + f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n" + f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%" + ) + + # Riavvia nuovo tipo + start_idx = i + current_rain_type = get_precip_type(codes[i]) + + elif (not is_raining and in_rain) or (in_rain and i == len(times)-1): + in_rain = False + end_idx = i if not is_raining else i + 1 + + block_precip = precip[start_idx:end_idx] + tot_mm = sum(block_precip) + + if tot_mm > 0: + max_prob = max(probs[start_idx:end_idx]) + start_time = times[start_idx].split("T")[1][:5] + end_time = times[end_idx-1].split("T")[1][:5] + avg_intensity = tot_mm / len(block_precip) + + events.append( + f"{current_rain_type} ({get_intensity_label(avg_intensity)}):\n" + f" 🕒 {start_time}-{end_time} | 💧 {tot_mm:.1f}mm | Prob: {max_prob}%" + ) + + # ========================================== + # 3. LIVELLO VENTO + # ========================================== + max_wind = max(winds) + if max_wind > SOGLIA_VENTO_KMH: + peak_idx = winds.index(max_wind) + peak_time = times[peak_idx].split("T")[1][:5] + events.append(f"💨 Vento Forte: Picco {max_wind}km/h alle {peak_time}") + + return events + +def format_report(data, location_name): + hourly = data['hourly'] + daily = data['daily'] + + msg = f"🌍 METEO ALERT: {location_name.upper()}\n" + msg += f"📡 Modelli: AROME/ICON HD\n\n" + + daily_map = defaultdict(list) + for i, t in enumerate(hourly['time']): + daily_map[t.split("T")[0]].append(i) + + count = 0 + for day_date, indices in daily_map.items(): + if count >= 7: break + + d_times = [hourly['time'][i] for i in indices] + d_codes = [hourly['weathercode'][i] for i in indices] + d_probs = [hourly['precipitation_probability'][i] for i in indices] + d_precip = [hourly['precipitation'][i] for i in indices] + d_winds = [hourly['windspeed_10m'][i] for i in indices] + d_temps = [hourly['temperature_2m'][i] for i in indices] + d_dews = [hourly['dewpoint_2m'][i] for i in indices] + + try: + t_min = daily['temperature_2m_min'][count] + t_max = daily['temperature_2m_max'][count] + except: + t_min, t_max = min(d_temps), max(d_temps) + + events_list = analyze_daily_events(d_times, d_codes, d_probs, d_precip, d_winds, d_temps, d_dews) + + dt = datetime.datetime.strptime(day_date, "%Y-%m-%d") + day_str = dt.strftime("%a %d/%m") + + msg += f"📅 {day_str} 🌡️ {t_min:.0f}°/{t_max:.0f}°C\n" + + if events_list: + for ev in events_list: + msg += f" ➤ {ev}\n" + else: + msg += " ✅ Nessun fenomeno rilevante\n" + + msg += "\n" + count += 1 + + return msg + +def send_telegram(text, chat_id, token): + requests.post(f"https://api.telegram.org/bot{token}/sendMessage", + json={"chat_id": chat_id, "text": text, "parse_mode": "HTML"}) + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("query", nargs="?", default="casa") + parser.add_argument("--chat_id") + parser.add_argument("--debug", action="store_true") + args = parser.parse_args() + + token = get_bot_token() + dest_chat = args.chat_id if args.chat_id and not args.debug else ADMIN_CHAT_ID + + lat, lon, name = get_coordinates(args.query) + if not lat: return send_telegram(f"❌ '{args.query}' non trovato.", dest_chat, token) + + data = get_weather(lat, lon) + if not data: return send_telegram("❌ Errore dati meteo.", dest_chat, token) + + send_telegram(format_report(data, name), dest_chat, token) + +if __name__ == "__main__": + main()