Delete services/telegram-bot/daily_flight_report.py

This commit is contained in:
2025-12-28 19:07:55 +01:00
parent c89436c26b
commit 526f2152b3

View File

@@ -1,189 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import datetime
import requests
import logging
from zoneinfo import ZoneInfo
from dateutil import parser
# --- CONFIGURAZIONE ---
# 1. Chiave API Voli (Tua)
RAPIDAPI_KEY = "841975fb1fmshd139bc1a12cd454p100114jsn8acf1ccede63"
# 2. Chat ID (Esclusivo per te)
CHAT_IDS = ["64463169"]
# 3. Lettura Token da file /etc
TOKEN_FILE = "/etc/telegram_dpc_bot_token"
def get_bot_token():
try:
if os.path.exists(TOKEN_FILE):
with open(TOKEN_FILE, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception as e:
print(f"Errore lettura file token {TOKEN_FILE}: {e}")
return None
BOT_TOKEN = get_bot_token()
TZ = ZoneInfo("Europe/Rome")
# --- FUNZIONI ---
def fetch_segment(iata, start_ts, end_ts):
"""Esegue la chiamata API per un segmento orario specifico"""
url = f"https://aerodatabox.p.rapidapi.com/flights/airports/iata/{iata}/{start_ts}/{end_ts}"
querystring = {
"withLeg": "true",
"direction": "Both",
"withCancelled": "false",
"withCodeshared": "false",
"withCargo": "false",
"withPrivate": "false"
}
headers = {
"X-RapidAPI-Key": RAPIDAPI_KEY,
"X-RapidAPI-Host": "aerodatabox.p.rapidapi.com"
}
try:
response = requests.get(url, headers=headers, params=querystring, timeout=15)
if response.status_code == 204: return [] # Nessun volo in questa fascia
if response.status_code != 200:
print(f"DEBUG {iata} Error {response.status_code}: {response.text}") # Debug approfondito
return None # Segnala errore
return response.json()
except Exception as e:
print(f"Exception fetching {iata}: {e}")
return None
def get_flights(iata, name):
"""Scarica arrivi e partenze dividendo la giornata in 2 blocchi da 12h"""
now = datetime.datetime.now(TZ)
date_str = now.strftime("%Y-%m-%d")
# AeroDataBox limita le richieste a finestre di 12 ore.
# Dividiamo in Mattina (00:00-11:59) e Pomeriggio (12:00-23:59)
segments = [
(f"{date_str}T00:00", f"{date_str}T11:59"),
(f"{date_str}T12:00", f"{date_str}T23:59")
]
raw_arrivals = []
raw_departures = []
error_occurred = False
for start_t, end_t in segments:
data = fetch_segment(iata, start_t, end_t)
if data is None:
error_occurred = True
continue
if isinstance(data, dict):
raw_arrivals.extend(data.get("arrivals", []))
raw_departures.extend(data.get("departures", []))
if error_occurred and not raw_arrivals and not raw_departures:
return f"\n⚠️ *{name} ({iata})* - Errore API (Vedi log)"
flight_list = []
# Processa Arrivi (Deduplica basata su orario+numero per sicurezza)
seen = set()
for f in raw_arrivals:
try:
time_local = f["movement"]["scheduledTimeLocal"]
dt = parser.isoparse(time_local)
flight_no = f.get("number", "")
uid = f"{dt}_{flight_no}_ARR"
if uid in seen: continue
seen.add(uid)
airline = f.get("airline", {}).get("name", "Unknown")
origin = f.get("movement", {}).get("airport", {}).get("name", "Unknown")
flight_list.append({
"time": dt,
"text": f"🛬 `{dt.strftime('%H:%M')}` da {origin}\n └ *{airline}* ({flight_no})"
})
except: continue
# Processa Partenze
for f in raw_departures:
try:
time_local = f["movement"]["scheduledTimeLocal"]
dt = parser.isoparse(time_local)
flight_no = f.get("number", "")
uid = f"{dt}_{flight_no}_DEP"
if uid in seen: continue
seen.add(uid)
airline = f.get("airline", {}).get("name", "Unknown")
dest = f.get("movement", {}).get("airport", {}).get("name", "Unknown")
flight_list.append({
"time": dt,
"text": f"🛫 `{dt.strftime('%H:%M')}` per {dest}\n └ *{airline}* ({flight_no})"
})
except: continue
if not flight_list:
return f"\n✈️ *{name} ({iata})*\n_Nessun volo programmato oggi._"
# Ordina per orario
flight_list.sort(key=lambda x: x["time"])
msg = f"\n✈️ *{name} ({iata})*"
for item in flight_list:
msg += f"\n{item['text']}"
return msg
def send_telegram(text):
if not BOT_TOKEN:
print(f"❌ ERRORE CRITICO: Token non trovato in {TOKEN_FILE}")
return
url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
for chat_id in CHAT_IDS:
payload = {
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown",
"disable_web_page_preview": True
}
try:
requests.post(url, json=payload, timeout=10)
print(f"Messaggio inviato a {chat_id}")
except Exception as e:
print(f"Errore invio a {chat_id}: {e}")
# --- MAIN ---
def main():
print("--- Elaborazione Voli (Split 12h) ---")
today = datetime.datetime.now(TZ).strftime("%d/%m/%Y")
report = f"📆 *PROGRAMMA VOLI {today}*\n"
report += get_flights("RMI", "Rimini")
report += "\n"
report += get_flights("FRL", "Forlì")
print(report)
send_telegram(report)
if __name__ == "__main__":
main()