apiVersion: v1 kind: PersistentVolumeClaim metadata: name: glance-helper-data namespace: glance-system spec: accessModes: - ReadWriteOnce resources: requests: storage: 200Mi --- apiVersion: v1 kind: ConfigMap metadata: name: glance-helper-app namespace: glance-system data: app.py: |- import os import time import re from typing import List, Dict, Any, Optional import requests from bs4 import BeautifulSoup from fastapi import FastAPI, Response from fastapi.middleware.cors import CORSMiddleware from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST APP = FastAPI() # Add CORS middleware to allow cross-origin requests from Glance dashboards APP.add_middleware( CORSMiddleware, allow_origins=["https://kisfenyo.dooplex.hu", "https://orsi.dooplex.hu", "https://glance-helper.dooplex.hu"], allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], ) # ================================ # Időkép configuration # ================================ IDOKEP_URL = os.getenv("IDOKEP_URL", "https://www.idokep.hu/idojaras/Budapest%20VII.%20ker") PLACE_NAME = os.getenv("PLACE_NAME", "Budapest VII. ker") SOURCE_NAME = "Időkép" UA = os.getenv("USER_AGENT", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome Safari") SCRAPES = Counter("idokep_scrapes_total", "Total Időkép scrapes", ["place", "status"]) SCRAPE_SECONDS = Histogram("idokep_scrape_seconds", "Időkép scrape duration in seconds", ["place"]) ICON_CONDITIONS = { # Daytime (0xx) "010": "Derült", "011": "Pára", "021": "Gyengén felhős", "022": "Közepesen felhős", "023": "Erősen felhős", "030": "Borult", "040": "Ködszitálás", "041": "Szitálás", "042": "Gyenge eső", "043": "Eső", "043s": "Eső viharos széllel", "051": "Havas eső", "052": "Ónos eső", "061": "Havazás", "062": "Havazás", "081": "Zápor", "083": "Hózápor", "088": "Jégeső", "090": "Zivatar", "092": "Száraz zivatar", "100": "Köd", "101": "Porvihar", # Nighttime (3xx) - same conditions "310": "Derült", "311": "Pára", "321": "Gyengén felhős", "322": "Közepesen felhős", "323": "Erősen felhős", "330": "Borult", "340": "Ködszitálás", "341": "Szitálás", "342": "Gyenge eső", "343": "Eső", "343s": "Eső viharos széllel", "351": "Havas eső", "352": "Ónos eső", "361": "Havazás", "362": "Havazás", "381": "Zápor", "383": "Hózápor", "388": "Jégeső", "390": "Zivatar", "392": "Száraz zivatar", "400": "Köd", "401": "Porvihar", } def _icon_condition(icon_url: str) -> str: """Extract weather condition from icon URL.""" if not icon_url: return "" # URL format: https://www.idokep.hu/assets/forecast-icons/XXX.svg (or .png) import re match = re.search(r'/(\d{3}s?)\.(svg|png)', icon_url) if match: code = match.group(1) return ICON_CONDITIONS.get(code, "") return "" def _abs_url(url): return "https://www.idokep.hu" + url if url and not url.startswith("http") else url def _to_float(s): try: return float(s.strip().replace("˚C", "").replace("°C", "").replace("°", "").replace(",", ".")) except: return None def _calculate_gradient_data(daily_items): if not daily_items: return daily_items # 1. Global Min/Max for the week mins = [d["tmin_c"] for d in daily_items if d["tmin_c"] is not None] maxs = [d["tmax_c"] for d in daily_items if d["tmax_c"] is not None] if not mins or not maxs: return daily_items g_min = min(mins) - 1.0 g_max = max(maxs) + 1.0 span = g_max - g_min if span <= 0: span = 1.0 # 2. Helper to get percentage def get_pct(t): return (t - g_min) / span * 100.0 # 3. Calculate stops (floats) s_wht = get_pct(-10) s_blu = get_pct(0) s_pur = get_pct(15) s_pnk = get_pct(25) s_red = get_pct(35) # 4. Inject individual floats into the dictionary for d in daily_items: tmin, tmax = d["tmin_c"], d["tmax_c"] if tmin is None or tmax is None: continue w_pct = (tmax - tmin) / span * 100.0 if w_pct < 2: w_pct = 2 l_pct = (tmin - g_min) / span * 100.0 inner_w = (100.0 / w_pct) * 100.0 inner_ml = -(l_pct / w_pct) * 100.0 # STORE AS FLOATS (No "%" symbol here) d["c_l"] = l_pct d["c_w"] = w_pct d["c_gw"] = inner_w d["c_ml"] = inner_ml d["c_s1"] = s_wht # White d["c_s2"] = s_blu # Blue d["c_s3"] = s_pur # Purple d["c_s4"] = s_pnk # Pink d["c_s5"] = s_red # Red return daily_items def scrape() -> Dict[str, Any]: try: r = requests.get(IDOKEP_URL, headers={"User-Agent": UA}, timeout=15) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") # Current cur_temp_el = soup.select_one(".current-temperature") cur_cond_el = soup.select_one(".current-weather") cur_icon_el = soup.select_one(".forecast-bigicon") # Hourly hourly = [] for card in soup.select(".ik.hourly-forecast-card")[:8]: t = card.select_one(".ik.hourly-forecast-hour") temp = card.select_one(".ik.temperature-circled") icon = card.select_one("img.ik.forecast-icon") if t and temp: hourly.append({ "time": t.get_text(strip=True), "temp_c": _to_float(temp.get_text(strip=True)), "icon_url": _abs_url(icon.get("src")), "condition": _icon_condition(_abs_url(icon.get("src"))) }) # Daily daily = [] for col in soup.select(".ik.daily-forecast-container .ik.dailyForecastCol")[:15]: dow = col.select_one(".ik.dfDay") daynum = col.select_one(".ik.dfDayNum") icon = col.select_one("img.ik.forecast-icon") tmax = col.select_one("div.ik.max") tmin = col.select_one("div.ik.min") # Fallback for holiday layout v_tmax, v_tmin = None, None if tmax and tmin: v_tmax = _to_float(tmax.get_text(strip=True)) v_tmin = _to_float(tmin.get_text(strip=True)) else: vals = [a.get_text(strip=True) for a in col.select(".ik.min-max-container a")] vals = [v for v in vals if re.fullmatch(r"-?\d+", v)] if len(vals) >= 2: v_tmax, v_tmin = _to_float(vals[0]), _to_float(vals[1]) if v_tmax is not None and v_tmin is not None: icon_url = _abs_url(icon.get("src") if icon else None) daily.append({ "daynum": daynum.get_text(strip=True) if daynum else "", "dow": dow.get_text(strip=True) if dow else "", "tmin_c": v_tmin, "tmax_c": v_tmax, "icon_url": _abs_url(icon.get("src") if icon else None), "condition": _icon_condition(icon_url), }) daily = _calculate_gradient_data(daily[:5]) return { "source": {"name": SOURCE_NAME, "url": IDOKEP_URL}, "location": {"name": PLACE_NAME}, "current": { "temp_c": _to_float(cur_temp_el.get_text(strip=True)) if cur_temp_el else 0, "condition": cur_cond_el.get_text(strip=True) if cur_cond_el else "", "icon_url": _abs_url(cur_icon_el.get("src")) if cur_icon_el else "" }, "hourly": hourly, "daily": daily, "fetched_at_unix": int(time.time()), } except Exception as e: print(f"Scrape error: {e}") raise @APP.get("/api") def api(): status = "ok" try: data = scrape() except: status = "error" SCRAPES.labels(place=PLACE_NAME, status=status).inc() raise SCRAPES.labels(place=PLACE_NAME, status=status).inc() import json return Response(content=json.dumps(data, ensure_ascii=False), media_type="application/json; charset=utf-8") @APP.get("/metrics") def metrics(): return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) # ================================ # Tandoor "Meal of the Day" - Enhanced Version # ================================ from datetime import datetime, timezone, timedelta from zoneinfo import ZoneInfo from urllib.parse import urlencode from fastapi import HTTPException, Query from fastapi.responses import RedirectResponse import json import random from pathlib import Path TANDOOR_INTERNAL_URL = os.getenv("TANDOOR_INTERNAL_URL", "").rstrip("/") TANDOOR_PUBLIC_URL = os.getenv("TANDOOR_PUBLIC_URL", "").rstrip("/") GLANCE_HELPER_PUBLIC_URL = os.getenv("GLANCE_HELPER_PUBLIC_URL", "").rstrip("/") GLANCE_HELPER_KEY = os.getenv("GLANCE_HELPER_KEY", "") DATA_DIR = Path(os.getenv("DATA_DIR", "/data")) DATA_DIR.mkdir(parents=True, exist_ok=True) COOKED_PATH = DATA_DIR / "tandoor-cooked.json" PICKS_PATH = DATA_DIR / "tandoor-picks.json" # Cooldown: don't suggest recipes cooked within this many days TANDOOR_COOLDOWN_DAYS = int(os.getenv("TANDOOR_COOLDOWN_DAYS", "14")) def _today_str() -> str: """YYYY-MM-DD in Europe/Budapest if tzdata exists, else UTC.""" try: tz = ZoneInfo("Europe/Budapest") return datetime.now(tz).strftime("%Y-%m-%d") except Exception: return datetime.now(timezone.utc).strftime("%Y-%m-%d") def _load_json(path: Path, default): try: if path.exists(): return json.loads(path.read_text(encoding="utf-8")) except Exception: pass return default def _save_json(path: Path, obj) -> None: tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8") tmp.replace(path) def _tandoor_headers(): token = os.getenv("TANDOOR_TOKEN", "") if not token: raise HTTPException(status_code=500, detail="TANDOOR_TOKEN is not set") return {"Authorization": f"Bearer {token}", "Accept": "application/json"} def _rewrite_to_public(url: str) -> str: """Turn internal URLs into public ones (images/links).""" if not url: return url if TANDOOR_PUBLIC_URL and TANDOOR_INTERNAL_URL and url.startswith(TANDOOR_INTERNAL_URL): return TANDOOR_PUBLIC_URL + url[len(TANDOOR_INTERNAL_URL):] return url def _fetch_all_recipes() -> list[dict]: if not TANDOOR_INTERNAL_URL: raise HTTPException(status_code=500, detail="TANDOOR_INTERNAL_URL is not set") url = f"{TANDOOR_INTERNAL_URL}/api/recipe/?page_size=200" out = [] for _ in range(100): # safety r = requests.get(url, headers=_tandoor_headers(), timeout=15) if r.status_code != 200: raise HTTPException(status_code=502, detail=f"Tandoor returned {r.status_code}: {r.text[:200]}") j = r.json() results = j.get("results", []) out.extend(results) url = j.get("next") if not url: break return out # ================================ # Cooked History Management # ================================ # Structure: {"": ["2026-01-22", "2026-01-15", ...], ...} def _load_cooked_history() -> dict[str, list[str]]: """Load cooked history. Returns {recipe_id_str: [date_str, ...]}""" return _load_json(COOKED_PATH, {}) def _save_cooked_history(history: dict[str, list[str]]) -> None: _save_json(COOKED_PATH, history) def _get_cooked_count(history: dict, recipe_id: int) -> int: """Get total times a recipe has been cooked.""" return len(history.get(str(recipe_id), [])) def _get_recently_cooked_ids(history: dict, days: int) -> set[int]: """Get recipe IDs cooked within the last N days.""" if days <= 0: return set() try: tz = ZoneInfo("Europe/Budapest") except Exception: tz = timezone.utc cutoff = (datetime.now(tz) - timedelta(days=days)).strftime("%Y-%m-%d") recent = set() for rid_str, dates in history.items(): for d in dates: if d >= cutoff: recent.add(int(rid_str)) break return recent def _record_cooked(recipe_id: int) -> dict[str, list[str]]: """Record that a recipe was cooked today. Returns updated history.""" history = _load_cooked_history() rid_str = str(recipe_id) today = _today_str() if rid_str not in history: history[rid_str] = [] # Avoid duplicate entries for same day if today not in history[rid_str]: history[rid_str].append(today) # Keep sorted (newest first) for easier reading history[rid_str] = sorted(history[rid_str], reverse=True) _save_cooked_history(history) return history # ================================ # Daily Picks Logic # ================================ def _compute_daily_picks(count: int, cooldown_days: int) -> dict: today = _today_str() history = _load_cooked_history() recently_cooked = _get_recently_cooked_ids(history, cooldown_days) picks_doc = _load_json(PICKS_PATH, {}) # Check if we need to regenerate picks needs_regen = False if picks_doc.get("date") != today: needs_regen = True elif picks_doc.get("count") != count: needs_regen = True elif picks_doc.get("cooldown") != cooldown_days: needs_regen = True else: # Check if we can add more picks (new recipes added mid-day) current_ids = set(picks_doc.get("ids", [])) if len(current_ids) < count: recipes = _fetch_all_recipes() available = [r.get("id") for r in recipes if r.get("id") is not None and r.get("id") not in current_ids and r.get("id") not in recently_cooked] if available: needs_regen = True if not needs_regen: return picks_doc # Generate new picks recipes = _fetch_all_recipes() available = [r for r in recipes if r.get("id") is not None and r.get("id") not in recently_cooked] if not available: # Fallback: if everything is in cooldown, ignore cooldown available = recipes[:] chosen = random.sample(available, k=min(count, len(available))) if available else [] picks_doc = { "date": today, "count": count, "cooldown": cooldown_days, "ids": [r.get("id") for r in chosen if r.get("id") is not None] } _save_json(PICKS_PATH, picks_doc) return picks_doc def _build_items_from_ids(ids: list[int], history: dict) -> tuple[list[dict], int]: recipes = _fetch_all_recipes() by_id = {r.get("id"): r for r in recipes if r.get("id") is not None} items = [] for rid in ids: r = by_id.get(rid) if not r: continue img = _rewrite_to_public(r.get("image") or "") url = f"{TANDOOR_PUBLIC_URL}/recipe/{rid}" if TANDOOR_PUBLIC_URL else "" cook_params = {"id": rid} if GLANCE_HELPER_KEY: cook_params["key"] = GLANCE_HELPER_KEY cook_url = f"{GLANCE_HELPER_PUBLIC_URL}/tandoor/cook?{urlencode(cook_params)}" if GLANCE_HELPER_PUBLIC_URL else "" items.append({ "id": rid, "name": r.get("name") or "", "image": img, "url": url, "cook_url": cook_url, "cooked_count": _get_cooked_count(history, rid) }) return items, len(recipes) @APP.get("/tandoor/daily") def tandoor_daily(count: int = Query(3, ge=1, le=10), cooldown: int = Query(None, ge=0, le=365)): """ Get daily meal suggestions. - count: Number of suggestions (1-10, default 3) - cooldown: Days to exclude recently cooked meals (0-365, default from env TANDOOR_COOLDOWN_DAYS or 14) """ cooldown_days = cooldown if cooldown is not None else TANDOOR_COOLDOWN_DAYS picks_doc = _compute_daily_picks(count, cooldown_days) ids = picks_doc.get("ids", []) history = _load_cooked_history() items, total = _build_items_from_ids(ids, history) return { "date": picks_doc.get("date"), "total_recipes": total, "cooldown_days": cooldown_days, "items": items } @APP.get("/tandoor/cook") def tandoor_cook(id: int, key: str | None = None, redirect: str | None = None): """Mark a recipe as cooked today.""" if GLANCE_HELPER_KEY and key != GLANCE_HELPER_KEY: raise HTTPException(status_code=403, detail="Invalid key") today = _today_str() # Record in permanent history history = _record_cooked(id) recently_cooked = _get_recently_cooked_ids(history, TANDOOR_COOLDOWN_DAYS) # Remove from today's picks and try to refill picks = _load_json(PICKS_PATH, {}) if picks.get("date") == today and isinstance(picks.get("ids"), list): ids = [x for x in picks["ids"] if x != id] target = int(picks.get("count") or len(ids)) if len(ids) < target: recipes = _fetch_all_recipes() avoid = set(ids) | recently_cooked candidates = [r.get("id") for r in recipes if r.get("id") is not None and r.get("id") not in avoid] if candidates: ids.append(random.choice(candidates)) picks["ids"] = ids[:target] _save_json(PICKS_PATH, picks) if redirect: return RedirectResponse(url=redirect, status_code=302) return { "ok": True, "date": today, "cooked_id": id, "cooked_total": _get_cooked_count(history, id) } @APP.get("/tandoor/history") def tandoor_history(): """Get full cooked history (for debugging/stats).""" history = _load_cooked_history() return { "recipes": { int(k): {"dates": v, "count": len(v)} for k, v in history.items() }, "total_cooks": sum(len(v) for v in history.values()) } # ================================ # Version Checker - Container Version Monitoring # ================================ # Scrapes version-checker Prometheus metrics, parses versions, # and returns properly sorted results with the NEWEST available version. VERSION_CHECKER_URL = os.getenv("VERSION_CHECKER_URL", "http://version-checker.version-checker-system.svc.cluster.local:8080/metrics") # Regex patterns for filtering (same as in Glance widget) VERSION_CHECKER_EXCLUDE_IMAGES = os.getenv("VERSION_CHECKER_EXCLUDE_IMAGES", r"(^|.*/)(busybox|redis|alpine|nginx|mariadb|mysql|valkey)$|^longhornio.*$|(^|.*/)postgres.*$|^registry\.k8s\.io/ingress-nginx/.*$" ) def _parse_semver(version: str) -> tuple: """ Parse version string into comparable tuple. Handles: v1.2.3, 1.2.3, v1.2.3-beta, 1.2.3-20251030, etc. Returns tuple that sorts correctly: (major, minor, patch, prerelease_flag, prerelease) """ if not version: return (0, 0, 0, 0, "") # Skip sha256 digests entirely if "sha256:" in version or version.startswith("sha"): return (0, 0, 0, 0, version) # Remove 'v' prefix v = version.lstrip("v") # Remove @sha256:... suffix if present (e.g., "1.2.3@sha256:abc...") if "@" in v: v = v.split("@")[0] # Split into version and prerelease/build metadata # Handle: 1.2.3-beta, 1.2.3-20251030, 1.2.3+build prerelease = "" if "-" in v: parts = v.split("-", 1) v = parts[0] prerelease = parts[1] elif "+" in v: parts = v.split("+", 1) v = parts[0] prerelease = parts[1] # Parse major.minor.patch segments = v.split(".") try: major = int(segments[0]) if len(segments) > 0 and segments[0].isdigit() else 0 minor = int(segments[1]) if len(segments) > 1 and segments[1].isdigit() else 0 patch = int(segments[2]) if len(segments) > 2 and segments[2].isdigit() else 0 except (ValueError, IndexError): major, minor, patch = 0, 0, 0 # For sorting: no prerelease > prerelease (1.2.3 > 1.2.3-beta) # prerelease_flag: 1 = no prerelease (higher), 0 = has prerelease (lower) prerelease_flag = 1 if not prerelease else 0 return (major, minor, patch, prerelease_flag, prerelease) def _compare_versions(v1: str, v2: str) -> int: """Compare two version strings. Returns: -1 if v1v2""" t1 = _parse_semver(v1) t2 = _parse_semver(v2) if t1 < t2: return -1 elif t1 > t2: return 1 return 0 def _parse_prometheus_metrics(text: str) -> list[dict]: """ Parse Prometheus metrics text format. Extracts version_checker_is_latest_version metrics. """ results = [] pattern = re.compile( r'version_checker_is_latest_version\{([^}]+)\}\s+(\d+(?:\.\d+)?)' ) for match in pattern.finditer(text): labels_str = match.group(1) value = float(match.group(2)) # Parse labels labels = {} # Handle labels like: container="foo",image="bar",current_version="1.0" label_pattern = re.compile(r'(\w+)="([^"]*)"') for label_match in label_pattern.finditer(labels_str): labels[label_match.group(1)] = label_match.group(2) results.append({ "labels": labels, "value": value # 1 = up to date, 0 = outdated }) return results def _fetch_version_checker_data() -> dict: """ Fetch and process version-checker metrics. Returns structured data with deduplicated images and newest versions. """ try: r = requests.get(VERSION_CHECKER_URL, timeout=30) r.raise_for_status() metrics = _parse_prometheus_metrics(r.text) except Exception as e: print(f"Version checker fetch error: {e}") raise HTTPException(status_code=502, detail=f"Failed to fetch version-checker: {e}") # Build exclude pattern exclude_pattern = re.compile(VERSION_CHECKER_EXCLUDE_IMAGES) if VERSION_CHECKER_EXCLUDE_IMAGES else None # Group metrics by image # Structure: {image: {"current": str, "latest_versions": [str], "is_latest": bool}} images: Dict[str, Dict] = {} for metric in metrics: labels = metric["labels"] container_type = labels.get("container_type", "") image = labels.get("image", "") current = labels.get("current_version", "") latest = labels.get("latest_version", "") is_latest = metric["value"] == 1 # Filter: only containers (not init containers for simplicity) if container_type != "container": continue # Filter: exclude patterns if exclude_pattern and exclude_pattern.search(image): continue # Filter: skip sha256 versions (can't compare meaningfully) if "sha256:" in current or "sha256:" in latest: continue # Initialize or update image entry if image not in images: images[image] = { "current_version": current, "latest_versions": [], "is_latest": True } # Track all latest versions reported (for outdated images) if not is_latest: images[image]["is_latest"] = False if latest and latest not in images[image]["latest_versions"]: images[image]["latest_versions"].append(latest) # Process: find the NEWEST version for each image up_to_date = [] outdated = [] for image, data in images.items(): current = data["current_version"] if data["is_latest"]: up_to_date.append({ "image": image, "current_version": current, "latest_version": current }) else: # Sort versions and pick the newest versions = data["latest_versions"] if versions: versions_sorted = sorted(versions, key=lambda v: _parse_semver(v), reverse=True) newest = versions_sorted[0] else: newest = current outdated.append({ "image": image, "current_version": current, "latest_version": newest, "all_newer_versions": len(versions) }) # Sort outdated by image name for consistent ordering outdated.sort(key=lambda x: x["image"]) up_to_date.sort(key=lambda x: x["image"]) return { "fetched_at_unix": int(time.time()), "summary": { "up_to_date": len(up_to_date), "outdated": len(outdated), "total": len(up_to_date) + len(outdated) }, "outdated": outdated, "up_to_date": up_to_date } @APP.get("/versions") def versions_api(): """ Returns container version status with properly sorted newest versions. Response format optimized for Glance custom-api widget. """ data = _fetch_version_checker_data() return Response( content=json.dumps(data, ensure_ascii=False), media_type="application/json; charset=utf-8" ) @APP.get("/versions/outdated") def versions_outdated(): """Returns only outdated images (for simpler widget).""" data = _fetch_version_checker_data() return Response( content=json.dumps({ "fetched_at_unix": data["fetched_at_unix"], "summary": data["summary"], "images": data["outdated"] }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) # ================================ # Google Calendar iCal Integration # ================================ from icalendar import Calendar as ICalCalendar from dateutil.rrule import rrulestr from dateutil import parser as dateutil_parser # Calendar configuration - loaded from environment CALENDAR_ICAL_URLS = os.getenv("CALENDAR_ICAL_URLS", "") # JSON: {"name": "url", ...} def _format_relative_date(dt: datetime, tz) -> str: """Format date as 'Today', 'Tomorrow', or 'Mon 24' style.""" now = datetime.now(tz) today = now.date() tomorrow = today + timedelta(days=1) event_date = dt.date() if event_date == today: return "Today" elif event_date == tomorrow: return "Tomorrow" else: return dt.strftime("%b %d") def _parse_ical_urls() -> dict[str, str]: """Parse CALENDAR_ICAL_URLS environment variable.""" if not CALENDAR_ICAL_URLS: return {} try: return json.loads(CALENDAR_ICAL_URLS) except Exception as e: print(f"Error parsing CALENDAR_ICAL_URLS: {e}") return {} def _fetch_ical(url: str, timeout: int = 15) -> str: """Fetch iCal content from URL.""" try: r = requests.get(url, timeout=timeout, headers={"User-Agent": UA}) r.raise_for_status() return r.text except Exception as e: print(f"Error fetching iCal from {url}: {e}") return "" def _parse_ical_events(ical_text: str, calendar_name: str, days_ahead: int = 30) -> list[dict]: """ Parse iCal text and return upcoming events. Handles both single events and recurring events. """ events = [] if not ical_text: return events try: tz = ZoneInfo("Europe/Budapest") except Exception: tz = timezone.utc now = datetime.now(tz) cutoff = now + timedelta(days=days_ahead) try: cal = ICalCalendar.from_ical(ical_text) for component in cal.walk(): if component.name != "VEVENT": continue summary = str(component.get("SUMMARY", "No title")) description = str(component.get("DESCRIPTION", "")) if component.get("DESCRIPTION") else "" location = str(component.get("LOCATION", "")) if component.get("LOCATION") else "" uid = str(component.get("UID", "")) # Get start and end times dtstart = component.get("DTSTART") dtend = component.get("DTEND") if not dtstart: continue dtstart_val = dtstart.dt dtend_val = dtend.dt if dtend else None # Handle all-day events (date vs datetime) is_all_day = not isinstance(dtstart_val, datetime) if is_all_day: # All-day event - convert date to datetime dtstart_val = datetime.combine(dtstart_val, datetime.min.time(), tzinfo=tz) if dtend_val and not isinstance(dtend_val, datetime): dtend_val = datetime.combine(dtend_val, datetime.min.time(), tzinfo=tz) else: # Make sure datetime is timezone-aware if dtstart_val.tzinfo is None: dtstart_val = dtstart_val.replace(tzinfo=tz) else: dtstart_val = dtstart_val.astimezone(tz) if dtend_val: if dtend_val.tzinfo is None: dtend_val = dtend_val.replace(tzinfo=tz) else: dtend_val = dtend_val.astimezone(tz) # Check for recurring events rrule = component.get("RRULE") if rrule: # Handle recurring events try: rrule_str = rrule.to_ical().decode("utf-8") rule = rrulestr(rrule_str, dtstart=dtstart_val) # Get occurrences within our window occurrences = list(rule.between(now, cutoff, inc=True))[:10] # Limit to 10 occurrences for occ in occurrences: if occ.tzinfo is None: occ = occ.replace(tzinfo=tz) # Calculate end time for this occurrence if dtend_val and dtstart_val: duration = dtend_val - dtstart_val occ_end = occ + duration else: occ_end = None events.append({ "title": summary, "description": description, "location": location, "calendar": calendar_name, "start": occ.isoformat(), "start_unix": int(occ.timestamp()), "start_date": occ.strftime("%b %d"), "start_date_relative": _format_relative_date(occ, tz), "start_time": occ.strftime("%H:%M"), "start_weekday": occ.strftime("%a"), "end": occ_end.isoformat() if occ_end else None, "end_unix": int(occ_end.timestamp()) if occ_end else None, "is_all_day": is_all_day, "uid": uid }) except Exception as e: print(f"Error parsing RRULE for event '{summary}': {e}") # Fall back to single event if now <= dtstart_val <= cutoff: events.append({ "title": summary, "description": description, "location": location, "calendar": calendar_name, "start": dtstart_val.isoformat(), "start_unix": int(dtstart_val.timestamp()), "start_date": dtstart_val.strftime("%b %d"), "start_date_relative": _format_relative_date(dtstart_val, tz), "start_time": dtstart_val.strftime("%H:%M"), "start_weekday": dtstart_val.strftime("%a"), "end": dtend_val.isoformat() if dtend_val else None, "end_unix": int(dtend_val.timestamp()) if dtend_val else None, "is_all_day": is_all_day, "uid": uid }) else: # Single event if now <= dtstart_val <= cutoff: events.append({ "title": summary, "description": description, "location": location, "calendar": calendar_name, "start": dtstart_val.isoformat(), "start_unix": int(dtstart_val.timestamp()), "start_date": dtstart_val.strftime("%b %d"), "start_date_relative": _format_relative_date(dtstart_val, tz), "start_time": dtstart_val.strftime("%H:%M"), "start_weekday": dtstart_val.strftime("%a"), "end": dtend_val.isoformat() if dtend_val else None, "end_unix": int(dtend_val.timestamp()) if dtend_val else None, "is_all_day": is_all_day, "uid": uid }) except Exception as e: print(f"Error parsing iCal for {calendar_name}: {e}") return events @APP.get("/calendar/events") def calendar_events_api( count: int = Query(default=5, ge=1, le=50, description="Number of events to return"), days: int = Query(default=30, ge=1, le=365, description="Days to look ahead"), calendars: str = Query(default="", description="Comma-separated list of calendar names to include (empty = all)") ): """ Returns upcoming calendar events from configured iCal feeds. Merges multiple calendars and sorts by start time. Use 'calendars' parameter to filter specific calendars (e.g., calendars=Családi,Órák) """ all_calendars = _parse_ical_urls() if not all_calendars: return Response( content=json.dumps({ "error": "No calendars configured", "events": [], "count": 0, "fetched_at_unix": int(time.time()) }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) # Filter calendars if specified if calendars.strip(): requested = [c.strip() for c in calendars.split(",") if c.strip()] filtered_calendars = {k: v for k, v in all_calendars.items() if k in requested} else: filtered_calendars = all_calendars all_events = [] calendar_status = {} for name, url in filtered_calendars.items(): ical_text = _fetch_ical(url) if ical_text: events = _parse_ical_events(ical_text, name, days) all_events.extend(events) calendar_status[name] = {"status": "ok", "events": len(events)} else: calendar_status[name] = {"status": "error", "events": 0} # Sort by start time and limit all_events.sort(key=lambda e: e["start_unix"]) limited_events = all_events[:count] return Response( content=json.dumps({ "events": limited_events, "count": len(limited_events), "total_available": len(all_events), "calendars": calendar_status, "fetched_at_unix": int(time.time()) }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) # ================================ # User Data Management (Notes, Todos, Motivation) # ================================ # File structure per user: /data/userdata-{username}.json # { # "notes": "free text...", # "todos": [{"id": "uuid", "text": "...", "done": false, "created": "..."}], # "motivation": ["quote1", "quote2", ...] # } import uuid as uuid_lib from pydantic import BaseModel from typing import Optional, List class TodoItem(BaseModel): text: str done: bool = False class TodoSyncRequest(BaseModel): """Request body for syncing todos.""" todos: list modified: str # Client's last modified timestamp class MotivationItem(BaseModel): text: str class NotesUpdate(BaseModel): content: str def _userdata_path(user: str) -> Path: """Get the path to user data file.""" # Sanitize username to prevent path traversal safe_user = re.sub(r'[^a-zA-Z0-9_-]', '', user) if not safe_user: raise HTTPException(status_code=400, detail="Invalid username") return DATA_DIR / f"userdata-{safe_user}.json" def _load_userdata(user: str) -> dict: """Load user data or return default structure.""" default = { "notes": "", "todos": [], "todos_modified": None, # ISO timestamp of last todos modification "motivation": [ "Believe in yourself!", "Every day is a new opportunity.", "You've got this!", "Small steps lead to big changes.", "Stay focused, stay positive." ] } path = _userdata_path(user) data = _load_json(path, default) # Ensure all keys exist for key in default: if key not in data: data[key] = default[key] return data def _save_userdata(user: str, data: dict) -> None: """Save user data to file.""" path = _userdata_path(user) _save_json(path, data) def _verify_key(key: str = Query(default="")): """Verify API key for write operations.""" if not GLANCE_HELPER_KEY: return True # No key configured, allow all if key != GLANCE_HELPER_KEY: raise HTTPException(status_code=403, detail="Invalid API key") return True # ========== GET ALL USER DATA ========== @APP.get("/userdata/{user}") def get_userdata(user: str): """Get all user data (notes, todos, motivation).""" data = _load_userdata(user) return Response( content=json.dumps(data, ensure_ascii=False), media_type="application/json; charset=utf-8" ) # ========== NOTES ========== @APP.get("/userdata/{user}/notes") def get_notes(user: str): """Get user notes.""" data = _load_userdata(user) return Response( content=json.dumps({"notes": data.get("notes", "")}, ensure_ascii=False), media_type="application/json; charset=utf-8" ) @APP.post("/userdata/{user}/notes") def update_notes(user: str, body: NotesUpdate, key: str = Query(default="")): """Update user notes.""" _verify_key(key) data = _load_userdata(user) data["notes"] = body.content _save_userdata(user, data) return Response( content=json.dumps({"success": True, "notes": data["notes"]}, ensure_ascii=False), media_type="application/json; charset=utf-8" ) # ========== TODOS ========== def _update_todos_modified(data: dict) -> str: """Update the todos_modified timestamp and return it.""" ts = datetime.now(timezone.utc).isoformat() data["todos_modified"] = ts return ts @APP.get("/userdata/{user}/todos") def get_todos(user: str): """Get user todos with modification timestamp for sync.""" data = _load_userdata(user) return Response( content=json.dumps({ "todos": data.get("todos", []), "count": len(data.get("todos", [])), "modified": data.get("todos_modified") }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) @APP.post("/userdata/{user}/todos/sync") def sync_todos(user: str, body: TodoSyncRequest, key: str = Query(default="")): """ Sync todos between client and server. - If client's modified > server's modified: update server with client data - If server's modified > client's modified: return server data (client should update) - Returns current server state either way """ _verify_key(key) data = _load_userdata(user) server_modified = data.get("todos_modified") client_modified = body.modified # Determine who has newer data # If server has no timestamp, client data is newer # If client timestamp > server timestamp, client is newer client_is_newer = False if not server_modified: client_is_newer = True elif client_modified and client_modified > server_modified: client_is_newer = True if client_is_newer: # Update server with client data data["todos"] = body.todos new_ts = _update_todos_modified(data) _save_userdata(user, data) return Response( content=json.dumps({ "action": "uploaded", "todos": data["todos"], "modified": new_ts, "count": len(data["todos"]) }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) else: # Server has newer or same data, client should download return Response( content=json.dumps({ "action": "downloaded", "todos": data.get("todos", []), "modified": server_modified, "count": len(data.get("todos", [])) }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) @APP.post("/userdata/{user}/todos") def add_todo(user: str, body: TodoItem, key: str = Query(default="")): """Add a new todo item.""" _verify_key(key) data = _load_userdata(user) new_todo = { "id": str(uuid_lib.uuid4())[:8], "text": body.text.strip(), "done": body.done, "created": datetime.now(timezone.utc).isoformat() } if not data.get("todos"): data["todos"] = [] data["todos"].append(new_todo) new_ts = _update_todos_modified(data) _save_userdata(user, data) return Response( content=json.dumps({"success": True, "todo": new_todo, "modified": new_ts}, ensure_ascii=False), media_type="application/json; charset=utf-8" ) @APP.put("/userdata/{user}/todos/{todo_id}") def update_todo(user: str, todo_id: str, body: TodoItem, key: str = Query(default="")): """Update a todo item (toggle done, update text).""" _verify_key(key) data = _load_userdata(user) for todo in data.get("todos", []): if todo.get("id") == todo_id: todo["text"] = body.text.strip() todo["done"] = body.done new_ts = _update_todos_modified(data) _save_userdata(user, data) return Response( content=json.dumps({"success": True, "todo": todo, "modified": new_ts}, ensure_ascii=False), media_type="application/json; charset=utf-8" ) raise HTTPException(status_code=404, detail="Todo not found") @APP.delete("/userdata/{user}/todos/{todo_id}") def delete_todo(user: str, todo_id: str, key: str = Query(default="")): """Delete a todo item.""" _verify_key(key) data = _load_userdata(user) original_count = len(data.get("todos", [])) data["todos"] = [t for t in data.get("todos", []) if t.get("id") != todo_id] if len(data["todos"]) == original_count: raise HTTPException(status_code=404, detail="Todo not found") new_ts = _update_todos_modified(data) _save_userdata(user, data) return Response( content=json.dumps({"success": True, "deleted": todo_id, "modified": new_ts}, ensure_ascii=False), media_type="application/json; charset=utf-8" ) # ========== MOTIVATION ========== @APP.get("/userdata/{user}/motivation") def get_motivation(user: str): """Get all motivation quotes.""" data = _load_userdata(user) quotes = data.get("motivation", []) return Response( content=json.dumps({ "quotes": quotes, "count": len(quotes) }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) @APP.get("/userdata/{user}/motivation/random") def get_random_motivation(user: str): """Get a random motivation quote.""" data = _load_userdata(user) quotes = data.get("motivation", []) if not quotes: return Response( content=json.dumps({ "quote": "Add some motivation quotes!", "index": -1, "total": 0 }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) idx = random.randint(0, len(quotes) - 1) return Response( content=json.dumps({ "quote": quotes[idx], "index": idx, "total": len(quotes) }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) @APP.post("/userdata/{user}/motivation") def add_motivation(user: str, body: MotivationItem, key: str = Query(default="")): """Add a new motivation quote.""" _verify_key(key) data = _load_userdata(user) if not data.get("motivation"): data["motivation"] = [] quote_text = body.text.strip() if quote_text and quote_text not in data["motivation"]: data["motivation"].append(quote_text) _save_userdata(user, data) return Response( content=json.dumps({ "success": True, "quotes": data["motivation"], "count": len(data["motivation"]) }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) @APP.delete("/userdata/{user}/motivation/{index}") def delete_motivation(user: str, index: int, key: str = Query(default="")): """Delete a motivation quote by index.""" _verify_key(key) data = _load_userdata(user) quotes = data.get("motivation", []) if index < 0 or index >= len(quotes): raise HTTPException(status_code=404, detail="Quote not found") deleted = quotes.pop(index) _save_userdata(user, data) return Response( content=json.dumps({ "success": True, "deleted": deleted, "quotes": quotes, "count": len(quotes) }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) --- apiVersion: apps/v1 kind: Deployment metadata: annotations: argocd.argoproj.io/tracking-id: glance:apps/Deployment:glance-system/glance-helper reloader.stakater.com/auto: "true" name: glance-helper namespace: glance-system spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: app: glance-helper strategy: type: Recreate template: metadata: labels: app: glance-helper app.kubernetes.io/name: glance-helper spec: containers: - args: - |- set -eux; export DEBIAN_FRONTEND=noninteractive; apt-get update; apt-get install -y --no-install-recommends curl ca-certificates iputils-ping dnsutils tzdata net-tools; rm -rf /var/lib/apt/lists/*; pip install --no-cache-dir fastapi uvicorn requests beautifulsoup4 prometheus-client icalendar python-dateutil pydantic; python -c "import uvicorn; uvicorn.run('app:APP', host='0.0.0.0', port=8000)" command: - /bin/sh - -lc env: - name: IDOKEP_URL value: https://www.idokep.hu/idojaras/Budapest%20VII.%20ker - name: PLACE_NAME value: Budapest VII. ker - name: TANDOOR_INTERNAL_URL value: http://tandoor.tandoor-system.svc.cluster.local:8080 - name: TANDOOR_PUBLIC_URL value: https://tandoor.dooplex.hu - name: TANDOOR_TOKEN value: tda_8a8b169c_5d1f_4962_83a2_0f2719c7d61a - name: GLANCE_HELPER_PUBLIC_URL value: https://glance-helper.dooplex.hu - name: DATA_DIR value: /data - name: GLANCE_HELPER_KEY value: oplQqnLnJK2vErRVYJpvVUcSDBOSbCHZSbsYY2bwSifgTMfT - name: TZ value: Europe/Budapest # Version Checker configuration - name: VERSION_CHECKER_URL value: http://version-checker.version-checker-system.svc.cluster.local:8080/metrics - name: VERSION_CHECKER_EXCLUDE_IMAGES value: "(^|.*/)(busybox|redis|alpine|nginx|mariadb|mysql|valkey)$|^longhornio.*$|(^|.*/)postgres.*$|^registry\\.k8s\\.io/ingress-nginx/.*$" # Calendar iCal URLs (JSON object: {"name": "url", ...}) - name: CALENDAR_ICAL_URLS value: '{"Órák": "https://calendar.google.com/calendar/ical/b2884faf3db792ac082a6206057552c79080716efd5f966e169a41fc500e1c1c%40group.calendar.google.com/private-0998d8053909ba4449c2f0a6409ce3de/basic.ics", "Családi": "https://calendar.google.com/calendar/ical/nitq3l0if4gn54k438obat5ia0%40group.calendar.google.com/private-59afcf70fee1a798ec369b86d9883b46/basic.ics"}' image: python:3.14-bookworm imagePullPolicy: IfNotPresent name: glance-helper ports: - containerPort: 8000 protocol: TCP resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /app name: app - mountPath: /data name: data workingDir: /app dnsPolicy: ClusterFirst restartPolicy: Always schedulerName: default-scheduler securityContext: {} terminationGracePeriodSeconds: 30 volumes: - configMap: defaultMode: 420 name: glance-helper-app name: app - name: data persistentVolumeClaim: claimName: glance-helper-data --- apiVersion: v1 kind: Service metadata: name: glance-helper namespace: glance-system spec: selector: app.kubernetes.io/name: glance-helper ports: - name: http port: 8000 targetPort: 8000 --- apiVersion: v1 kind: Service metadata: name: idokep-scraper namespace: glance-system spec: selector: app.kubernetes.io/name: glance-helper ports: - name: http port: 8000 targetPort: 8000 --- apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: glance-helper namespace: glance-system annotations: cert-manager.io/cluster-issuer: letsencrypt-prod external-dns.alpha.kubernetes.io/hostname: glance-helper.dooplex.hu nginx.ingress.kubernetes.io/ssl-redirect: "true" spec: ingressClassName: nginx-internal rules: - host: glance-helper.dooplex.hu http: paths: - path: / pathType: Prefix backend: service: name: glance-helper port: number: 8000 tls: - hosts: - glance-helper.dooplex.hu secretName: glance-helper-tls