apiVersion: v1 kind: PersistentVolumeClaim metadata: name: glance-helper-data namespace: glance-system spec: accessModes: - ReadWriteOnce resources: requests: storage: 200Mi --- apiVersion: v1 kind: ConfigMap metadata: name: glance-helper-app namespace: glance-system data: app.py: |- import os import time import re from typing import List, Dict, Any, Optional import requests from bs4 import BeautifulSoup from fastapi import FastAPI, Response from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST APP = FastAPI() # ================================ # Simple Notes Widget - Multi-user # ================================ def get_notes_file(user: str) -> str: """Get notes file path for a user, with validation.""" # Sanitize username: only allow alphanumeric, dash, underscore safe_user = re.sub(r'[^a-zA-Z0-9_-]', '', user) if not safe_user: safe_user = "default" data_dir = os.environ.get("DATA_DIR", "/data") return os.path.join(data_dir, f"notes_{safe_user}.txt") def load_notes(user: str) -> str: """Load notes from file for a specific user.""" notes_file = get_notes_file(user) try: if os.path.exists(notes_file): with open(notes_file, "r", encoding="utf-8") as f: return f.read() except Exception as e: print(f"Error loading notes for {user}: {e}") return "" def save_notes(user: str, content: str) -> bool: """Save notes to file for a specific user.""" notes_file = get_notes_file(user) try: with open(notes_file, "w", encoding="utf-8") as f: f.write(content) return True except Exception as e: print(f"Error saving notes for {user}: {e}") return False @APP.get("/notes") def notes_widget(key: str = "", user: str = "default", accent: str = "ffffff", bgcolor: str = ""): """Serve the notes widget HTML page for a specific user with optional theme.""" expected_key = os.environ.get("GLANCE_HELPER_KEY", "") if key != expected_key: return Response(content="Unauthorized", status_code=401) safe_user = re.sub(r'[^a-zA-Z0-9_-]', '', user) or "default" # Sanitize accent color (hex only, default to white) safe_accent = re.sub(r'[^a-fA-F0-9]', '', accent)[:6] or "ffffff" # Sanitize bgcolor (hex only, default to dark purple matching Orsi's theme) safe_bgcolor = re.sub(r'[^a-fA-F0-9]', '', bgcolor)[:6] or "2d1f3d" current_notes = load_notes(safe_user) escaped_notes = current_notes.replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """) html = f"""
""" return Response(content=html, media_type="text/html") @APP.post("/notes/save") async def save_notes_api(key: str = "", user: str = "default", content: dict = None): """API endpoint to save notes for a specific user.""" expected_key = os.environ.get("GLANCE_HELPER_KEY", "") if key != expected_key: return Response(content="Unauthorized", status_code=401) safe_user = re.sub(r'[^a-zA-Z0-9_-]', '', user) or "default" if content and "content" in content: if save_notes(safe_user, content["content"]): return {"status": "ok", "user": safe_user} return Response(content="Failed to save", status_code=500) # ================================ # Időkép configuration # ================================ IDOKEP_URL = os.getenv("IDOKEP_URL", "https://www.idokep.hu/idojaras/Budapest%20VII.%20ker") PLACE_NAME = os.getenv("PLACE_NAME", "Budapest VII. ker") SOURCE_NAME = "Időkép" UA = os.getenv("USER_AGENT", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome Safari") SCRAPES = Counter("idokep_scrapes_total", "Total Időkép scrapes", ["place", "status"]) SCRAPE_SECONDS = Histogram("idokep_scrape_seconds", "Időkép scrape duration in seconds", ["place"]) ICON_CONDITIONS = { # Daytime (0xx) "010": "Derült", "011": "Pára", "021": "Gyengén felhős", "022": "Közepesen felhős", "023": "Erősen felhős", "030": "Borult", "040": "Ködszitálás", "041": "Szitálás", "042": "Gyenge eső", "043": "Eső", "043s": "Eső viharos széllel", "051": "Havas eső", "052": "Ónos eső", "061": "Havazás", "062": "Havazás", "081": "Zápor", "083": "Hózápor", "088": "Jégeső", "090": "Zivatar", "092": "Száraz zivatar", "100": "Köd", "101": "Porvihar", # Nighttime (3xx) - same conditions "310": "Derült", "311": "Pára", "321": "Gyengén felhős", "322": "Közepesen felhős", "323": "Erősen felhős", "330": "Borult", "340": "Ködszitálás", "341": "Szitálás", "342": "Gyenge eső", "343": "Eső", "343s": "Eső viharos széllel", "351": "Havas eső", "352": "Ónos eső", "361": "Havazás", "362": "Havazás", "381": "Zápor", "383": "Hózápor", "388": "Jégeső", "390": "Zivatar", "392": "Száraz zivatar", "400": "Köd", "401": "Porvihar", } def _icon_condition(icon_url: str) -> str: """Extract weather condition from icon URL.""" if not icon_url: return "" # URL format: https://www.idokep.hu/assets/forecast-icons/XXX.svg (or .png) import re match = re.search(r'/(\d{3}s?)\.(svg|png)', icon_url) if match: code = match.group(1) return ICON_CONDITIONS.get(code, "") return "" def _abs_url(url): return "https://www.idokep.hu" + url if url and not url.startswith("http") else url def _to_float(s): try: return float(s.strip().replace("˚C", "").replace("°C", "").replace("°", "").replace(",", ".")) except: return None def _calculate_gradient_data(daily_items): if not daily_items: return daily_items # 1. Global Min/Max for the week mins = [d["tmin_c"] for d in daily_items if d["tmin_c"] is not None] maxs = [d["tmax_c"] for d in daily_items if d["tmax_c"] is not None] if not mins or not maxs: return daily_items g_min = min(mins) - 1.0 g_max = max(maxs) + 1.0 span = g_max - g_min if span <= 0: span = 1.0 # 2. Helper to get percentage def get_pct(t): return (t - g_min) / span * 100.0 # 3. Calculate stops (floats) s_wht = get_pct(-10) s_blu = get_pct(0) s_pur = get_pct(15) s_pnk = get_pct(25) s_red = get_pct(35) # 4. Inject individual floats into the dictionary for d in daily_items: tmin, tmax = d["tmin_c"], d["tmax_c"] if tmin is None or tmax is None: continue w_pct = (tmax - tmin) / span * 100.0 if w_pct < 2: w_pct = 2 l_pct = (tmin - g_min) / span * 100.0 inner_w = (100.0 / w_pct) * 100.0 inner_ml = -(l_pct / w_pct) * 100.0 # STORE AS FLOATS (No "%" symbol here) d["c_l"] = l_pct d["c_w"] = w_pct d["c_gw"] = inner_w d["c_ml"] = inner_ml d["c_s1"] = s_wht # White d["c_s2"] = s_blu # Blue d["c_s3"] = s_pur # Purple d["c_s4"] = s_pnk # Pink d["c_s5"] = s_red # Red return daily_items def scrape() -> Dict[str, Any]: try: r = requests.get(IDOKEP_URL, headers={"User-Agent": UA}, timeout=15) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") # Current cur_temp_el = soup.select_one(".current-temperature") cur_cond_el = soup.select_one(".current-weather") cur_icon_el = soup.select_one(".forecast-bigicon") # Hourly hourly = [] for card in soup.select(".ik.hourly-forecast-card")[:8]: t = card.select_one(".ik.hourly-forecast-hour") temp = card.select_one(".ik.temperature-circled") icon = card.select_one("img.ik.forecast-icon") if t and temp: hourly.append({ "time": t.get_text(strip=True), "temp_c": _to_float(temp.get_text(strip=True)), "icon_url": _abs_url(icon.get("src")), "condition": _icon_condition(_abs_url(icon.get("src"))) }) # Daily daily = [] for col in soup.select(".ik.daily-forecast-container .ik.dailyForecastCol")[:15]: dow = col.select_one(".ik.dfDay") daynum = col.select_one(".ik.dfDayNum") icon = col.select_one("img.ik.forecast-icon") tmax = col.select_one("div.ik.max") tmin = col.select_one("div.ik.min") # Fallback for holiday layout v_tmax, v_tmin = None, None if tmax and tmin: v_tmax = _to_float(tmax.get_text(strip=True)) v_tmin = _to_float(tmin.get_text(strip=True)) else: vals = [a.get_text(strip=True) for a in col.select(".ik.min-max-container a")] vals = [v for v in vals if re.fullmatch(r"-?\d+", v)] if len(vals) >= 2: v_tmax, v_tmin = _to_float(vals[0]), _to_float(vals[1]) if v_tmax is not None and v_tmin is not None: icon_url = _abs_url(icon.get("src") if icon else None) daily.append({ "daynum": daynum.get_text(strip=True) if daynum else "", "dow": dow.get_text(strip=True) if dow else "", "tmin_c": v_tmin, "tmax_c": v_tmax, "icon_url": _abs_url(icon.get("src") if icon else None), "condition": _icon_condition(icon_url), }) daily = _calculate_gradient_data(daily[:5]) return { "source": {"name": SOURCE_NAME, "url": IDOKEP_URL}, "location": {"name": PLACE_NAME}, "current": { "temp_c": _to_float(cur_temp_el.get_text(strip=True)) if cur_temp_el else 0, "condition": cur_cond_el.get_text(strip=True) if cur_cond_el else "", "icon_url": _abs_url(cur_icon_el.get("src")) if cur_icon_el else "" }, "hourly": hourly, "daily": daily, "fetched_at_unix": int(time.time()), } except Exception as e: print(f"Scrape error: {e}") raise @APP.get("/api") def api(): status = "ok" try: data = scrape() except: status = "error" SCRAPES.labels(place=PLACE_NAME, status=status).inc() raise SCRAPES.labels(place=PLACE_NAME, status=status).inc() import json return Response(content=json.dumps(data, ensure_ascii=False), media_type="application/json; charset=utf-8") @APP.get("/metrics") def metrics(): return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) # ================================ # Tandoor "Meal of the Day" - Enhanced Version # ================================ from datetime import datetime, timezone, timedelta from zoneinfo import ZoneInfo from urllib.parse import urlencode from fastapi import HTTPException, Query from fastapi.responses import RedirectResponse import json import random from pathlib import Path TANDOOR_INTERNAL_URL = os.getenv("TANDOOR_INTERNAL_URL", "").rstrip("/") TANDOOR_PUBLIC_URL = os.getenv("TANDOOR_PUBLIC_URL", "").rstrip("/") GLANCE_HELPER_PUBLIC_URL = os.getenv("GLANCE_HELPER_PUBLIC_URL", "").rstrip("/") GLANCE_HELPER_KEY = os.getenv("GLANCE_HELPER_KEY", "") DATA_DIR = Path(os.getenv("DATA_DIR", "/data")) DATA_DIR.mkdir(parents=True, exist_ok=True) COOKED_PATH = DATA_DIR / "tandoor-cooked.json" PICKS_PATH = DATA_DIR / "tandoor-picks.json" # Cooldown: don't suggest recipes cooked within this many days TANDOOR_COOLDOWN_DAYS = int(os.getenv("TANDOOR_COOLDOWN_DAYS", "14")) def _today_str() -> str: """YYYY-MM-DD in Europe/Budapest if tzdata exists, else UTC.""" try: tz = ZoneInfo("Europe/Budapest") return datetime.now(tz).strftime("%Y-%m-%d") except Exception: return datetime.now(timezone.utc).strftime("%Y-%m-%d") def _load_json(path: Path, default): try: if path.exists(): return json.loads(path.read_text(encoding="utf-8")) except Exception: pass return default def _save_json(path: Path, obj) -> None: tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8") tmp.replace(path) def _tandoor_headers(): token = os.getenv("TANDOOR_TOKEN", "") if not token: raise HTTPException(status_code=500, detail="TANDOOR_TOKEN is not set") return {"Authorization": f"Bearer {token}", "Accept": "application/json"} def _rewrite_to_public(url: str) -> str: """Turn internal URLs into public ones (images/links).""" if not url: return url if TANDOOR_PUBLIC_URL and TANDOOR_INTERNAL_URL and url.startswith(TANDOOR_INTERNAL_URL): return TANDOOR_PUBLIC_URL + url[len(TANDOOR_INTERNAL_URL):] return url def _fetch_all_recipes() -> list[dict]: if not TANDOOR_INTERNAL_URL: raise HTTPException(status_code=500, detail="TANDOOR_INTERNAL_URL is not set") url = f"{TANDOOR_INTERNAL_URL}/api/recipe/?page_size=200" out = [] for _ in range(100): # safety r = requests.get(url, headers=_tandoor_headers(), timeout=15) if r.status_code != 200: raise HTTPException(status_code=502, detail=f"Tandoor returned {r.status_code}: {r.text[:200]}") j = r.json() results = j.get("results", []) out.extend(results) url = j.get("next") if not url: break return out # ================================ # Cooked History Management # ================================ # Structure: {"": ["2026-01-22", "2026-01-15", ...], ...} def _load_cooked_history() -> dict[str, list[str]]: """Load cooked history. Returns {recipe_id_str: [date_str, ...]}""" return _load_json(COOKED_PATH, {}) def _save_cooked_history(history: dict[str, list[str]]) -> None: _save_json(COOKED_PATH, history) def _get_cooked_count(history: dict, recipe_id: int) -> int: """Get total times a recipe has been cooked.""" return len(history.get(str(recipe_id), [])) def _get_recently_cooked_ids(history: dict, days: int) -> set[int]: """Get recipe IDs cooked within the last N days.""" if days <= 0: return set() try: tz = ZoneInfo("Europe/Budapest") except Exception: tz = timezone.utc cutoff = (datetime.now(tz) - timedelta(days=days)).strftime("%Y-%m-%d") recent = set() for rid_str, dates in history.items(): for d in dates: if d >= cutoff: recent.add(int(rid_str)) break return recent def _record_cooked(recipe_id: int) -> dict[str, list[str]]: """Record that a recipe was cooked today. Returns updated history.""" history = _load_cooked_history() rid_str = str(recipe_id) today = _today_str() if rid_str not in history: history[rid_str] = [] # Avoid duplicate entries for same day if today not in history[rid_str]: history[rid_str].append(today) # Keep sorted (newest first) for easier reading history[rid_str] = sorted(history[rid_str], reverse=True) _save_cooked_history(history) return history # ================================ # Daily Picks Logic # ================================ def _compute_daily_picks(count: int, cooldown_days: int) -> dict: today = _today_str() history = _load_cooked_history() recently_cooked = _get_recently_cooked_ids(history, cooldown_days) picks_doc = _load_json(PICKS_PATH, {}) # Check if we need to regenerate picks needs_regen = False if picks_doc.get("date") != today: needs_regen = True elif picks_doc.get("count") != count: needs_regen = True elif picks_doc.get("cooldown") != cooldown_days: needs_regen = True else: # Check if we can add more picks (new recipes added mid-day) current_ids = set(picks_doc.get("ids", [])) if len(current_ids) < count: recipes = _fetch_all_recipes() available = [r.get("id") for r in recipes if r.get("id") is not None and r.get("id") not in current_ids and r.get("id") not in recently_cooked] if available: needs_regen = True if not needs_regen: return picks_doc # Generate new picks recipes = _fetch_all_recipes() available = [r for r in recipes if r.get("id") is not None and r.get("id") not in recently_cooked] if not available: # Fallback: if everything is in cooldown, ignore cooldown available = recipes[:] chosen = random.sample(available, k=min(count, len(available))) if available else [] picks_doc = { "date": today, "count": count, "cooldown": cooldown_days, "ids": [r.get("id") for r in chosen if r.get("id") is not None] } _save_json(PICKS_PATH, picks_doc) return picks_doc def _build_items_from_ids(ids: list[int], history: dict) -> tuple[list[dict], int]: recipes = _fetch_all_recipes() by_id = {r.get("id"): r for r in recipes if r.get("id") is not None} items = [] for rid in ids: r = by_id.get(rid) if not r: continue img = _rewrite_to_public(r.get("image") or "") url = f"{TANDOOR_PUBLIC_URL}/recipe/{rid}" if TANDOOR_PUBLIC_URL else "" cook_params = {"id": rid} if GLANCE_HELPER_KEY: cook_params["key"] = GLANCE_HELPER_KEY cook_url = f"{GLANCE_HELPER_PUBLIC_URL}/tandoor/cook?{urlencode(cook_params)}" if GLANCE_HELPER_PUBLIC_URL else "" items.append({ "id": rid, "name": r.get("name") or "", "image": img, "url": url, "cook_url": cook_url, "cooked_count": _get_cooked_count(history, rid) }) return items, len(recipes) @APP.get("/tandoor/daily") def tandoor_daily(count: int = Query(3, ge=1, le=10), cooldown: int = Query(None, ge=0, le=365)): """ Get daily meal suggestions. - count: Number of suggestions (1-10, default 3) - cooldown: Days to exclude recently cooked meals (0-365, default from env TANDOOR_COOLDOWN_DAYS or 14) """ cooldown_days = cooldown if cooldown is not None else TANDOOR_COOLDOWN_DAYS picks_doc = _compute_daily_picks(count, cooldown_days) ids = picks_doc.get("ids", []) history = _load_cooked_history() items, total = _build_items_from_ids(ids, history) return { "date": picks_doc.get("date"), "total_recipes": total, "cooldown_days": cooldown_days, "items": items } @APP.get("/tandoor/cook") def tandoor_cook(id: int, key: str | None = None, redirect: str | None = None): """Mark a recipe as cooked today.""" if GLANCE_HELPER_KEY and key != GLANCE_HELPER_KEY: raise HTTPException(status_code=403, detail="Invalid key") today = _today_str() # Record in permanent history history = _record_cooked(id) recently_cooked = _get_recently_cooked_ids(history, TANDOOR_COOLDOWN_DAYS) # Remove from today's picks and try to refill picks = _load_json(PICKS_PATH, {}) if picks.get("date") == today and isinstance(picks.get("ids"), list): ids = [x for x in picks["ids"] if x != id] target = int(picks.get("count") or len(ids)) if len(ids) < target: recipes = _fetch_all_recipes() avoid = set(ids) | recently_cooked candidates = [r.get("id") for r in recipes if r.get("id") is not None and r.get("id") not in avoid] if candidates: ids.append(random.choice(candidates)) picks["ids"] = ids[:target] _save_json(PICKS_PATH, picks) if redirect: return RedirectResponse(url=redirect, status_code=302) return { "ok": True, "date": today, "cooked_id": id, "cooked_total": _get_cooked_count(history, id) } @APP.get("/tandoor/history") def tandoor_history(): """Get full cooked history (for debugging/stats).""" history = _load_cooked_history() return { "recipes": { int(k): {"dates": v, "count": len(v)} for k, v in history.items() }, "total_cooks": sum(len(v) for v in history.values()) } # ================================ # Version Checker - Container Version Monitoring # ================================ # Scrapes version-checker Prometheus metrics, parses versions, # and returns properly sorted results with the NEWEST available version. VERSION_CHECKER_URL = os.getenv("VERSION_CHECKER_URL", "http://version-checker.version-checker-system.svc.cluster.local:8080/metrics") # Regex patterns for filtering (same as in Glance widget) VERSION_CHECKER_EXCLUDE_IMAGES = os.getenv("VERSION_CHECKER_EXCLUDE_IMAGES", r"(^|.*/)(busybox|redis|alpine|nginx|mariadb|mysql|valkey)$|^longhornio.*$|(^|.*/)postgres.*$|^registry\.k8s\.io/ingress-nginx/.*$" ) def _parse_semver(version: str) -> tuple: """ Parse version string into comparable tuple. Handles: v1.2.3, 1.2.3, v1.2.3-beta, 1.2.3-20251030, etc. Returns tuple that sorts correctly: (major, minor, patch, prerelease_flag, prerelease) """ if not version: return (0, 0, 0, 0, "") # Skip sha256 digests entirely if "sha256:" in version or version.startswith("sha"): return (0, 0, 0, 0, version) # Remove 'v' prefix v = version.lstrip("v") # Remove @sha256:... suffix if present (e.g., "1.2.3@sha256:abc...") if "@" in v: v = v.split("@")[0] # Split into version and prerelease/build metadata # Handle: 1.2.3-beta, 1.2.3-20251030, 1.2.3+build prerelease = "" if "-" in v: parts = v.split("-", 1) v = parts[0] prerelease = parts[1] elif "+" in v: parts = v.split("+", 1) v = parts[0] prerelease = parts[1] # Parse major.minor.patch segments = v.split(".") try: major = int(segments[0]) if len(segments) > 0 and segments[0].isdigit() else 0 minor = int(segments[1]) if len(segments) > 1 and segments[1].isdigit() else 0 patch = int(segments[2]) if len(segments) > 2 and segments[2].isdigit() else 0 except (ValueError, IndexError): major, minor, patch = 0, 0, 0 # For sorting: no prerelease > prerelease (1.2.3 > 1.2.3-beta) # prerelease_flag: 1 = no prerelease (higher), 0 = has prerelease (lower) prerelease_flag = 1 if not prerelease else 0 return (major, minor, patch, prerelease_flag, prerelease) def _compare_versions(v1: str, v2: str) -> int: """Compare two version strings. Returns: -1 if v1v2""" t1 = _parse_semver(v1) t2 = _parse_semver(v2) if t1 < t2: return -1 elif t1 > t2: return 1 return 0 def _parse_prometheus_metrics(text: str) -> list[dict]: """ Parse Prometheus metrics text format. Extracts version_checker_is_latest_version metrics. """ results = [] pattern = re.compile( r'version_checker_is_latest_version\{([^}]+)\}\s+(\d+(?:\.\d+)?)' ) for match in pattern.finditer(text): labels_str = match.group(1) value = float(match.group(2)) # Parse labels labels = {} # Handle labels like: container="foo",image="bar",current_version="1.0" label_pattern = re.compile(r'(\w+)="([^"]*)"') for label_match in label_pattern.finditer(labels_str): labels[label_match.group(1)] = label_match.group(2) results.append({ "labels": labels, "value": value # 1 = up to date, 0 = outdated }) return results def _fetch_version_checker_data() -> dict: """ Fetch and process version-checker metrics. Returns structured data with deduplicated images and newest versions. """ try: r = requests.get(VERSION_CHECKER_URL, timeout=30) r.raise_for_status() metrics = _parse_prometheus_metrics(r.text) except Exception as e: print(f"Version checker fetch error: {e}") raise HTTPException(status_code=502, detail=f"Failed to fetch version-checker: {e}") # Build exclude pattern exclude_pattern = re.compile(VERSION_CHECKER_EXCLUDE_IMAGES) if VERSION_CHECKER_EXCLUDE_IMAGES else None # Group metrics by image # Structure: {image: {"current": str, "latest_versions": [str], "is_latest": bool}} images: Dict[str, Dict] = {} for metric in metrics: labels = metric["labels"] container_type = labels.get("container_type", "") image = labels.get("image", "") current = labels.get("current_version", "") latest = labels.get("latest_version", "") is_latest = metric["value"] == 1 # Filter: only containers (not init containers for simplicity) if container_type != "container": continue # Filter: exclude patterns if exclude_pattern and exclude_pattern.search(image): continue # Filter: skip sha256 versions (can't compare meaningfully) if "sha256:" in current or "sha256:" in latest: continue # Initialize or update image entry if image not in images: images[image] = { "current_version": current, "latest_versions": [], "is_latest": True } # Track all latest versions reported (for outdated images) if not is_latest: images[image]["is_latest"] = False if latest and latest not in images[image]["latest_versions"]: images[image]["latest_versions"].append(latest) # Process: find the NEWEST version for each image up_to_date = [] outdated = [] for image, data in images.items(): current = data["current_version"] if data["is_latest"]: up_to_date.append({ "image": image, "current_version": current, "latest_version": current }) else: # Sort versions and pick the newest versions = data["latest_versions"] if versions: versions_sorted = sorted(versions, key=lambda v: _parse_semver(v), reverse=True) newest = versions_sorted[0] else: newest = current outdated.append({ "image": image, "current_version": current, "latest_version": newest, "all_newer_versions": len(versions) }) # Sort outdated by image name for consistent ordering outdated.sort(key=lambda x: x["image"]) up_to_date.sort(key=lambda x: x["image"]) return { "fetched_at_unix": int(time.time()), "summary": { "up_to_date": len(up_to_date), "outdated": len(outdated), "total": len(up_to_date) + len(outdated) }, "outdated": outdated, "up_to_date": up_to_date } @APP.get("/versions") def versions_api(): """ Returns container version status with properly sorted newest versions. Response format optimized for Glance custom-api widget. """ data = _fetch_version_checker_data() return Response( content=json.dumps(data, ensure_ascii=False), media_type="application/json; charset=utf-8" ) @APP.get("/versions/outdated") def versions_outdated(): """Returns only outdated images (for simpler widget).""" data = _fetch_version_checker_data() return Response( content=json.dumps({ "fetched_at_unix": data["fetched_at_unix"], "summary": data["summary"], "images": data["outdated"] }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) # ================================ # Google Calendar iCal Integration # ================================ from icalendar import Calendar as ICalCalendar from dateutil.rrule import rrulestr from dateutil import parser as dateutil_parser # Calendar configuration - loaded from environment CALENDAR_ICAL_URLS = os.getenv("CALENDAR_ICAL_URLS", "") # JSON: {"name": "url", ...} def _format_relative_date(dt: datetime, tz) -> str: """Format date as 'Today', 'Tomorrow', or 'Mon 24' style.""" now = datetime.now(tz) today = now.date() tomorrow = today + timedelta(days=1) event_date = dt.date() if event_date == today: return "Today" elif event_date == tomorrow: return "Tomorrow" else: return dt.strftime("%b %d") def _parse_ical_urls() -> dict[str, str]: """Parse CALENDAR_ICAL_URLS environment variable.""" if not CALENDAR_ICAL_URLS: return {} try: return json.loads(CALENDAR_ICAL_URLS) except Exception as e: print(f"Error parsing CALENDAR_ICAL_URLS: {e}") return {} def _fetch_ical(url: str, timeout: int = 15) -> str: """Fetch iCal content from URL.""" try: r = requests.get(url, timeout=timeout, headers={"User-Agent": UA}) r.raise_for_status() return r.text except Exception as e: print(f"Error fetching iCal from {url}: {e}") return "" def _parse_ical_events(ical_text: str, calendar_name: str, days_ahead: int = 30) -> list[dict]: """ Parse iCal text and return upcoming events. Handles both single events and recurring events. """ events = [] if not ical_text: return events try: tz = ZoneInfo("Europe/Budapest") except Exception: tz = timezone.utc now = datetime.now(tz) cutoff = now + timedelta(days=days_ahead) try: cal = ICalCalendar.from_ical(ical_text) for component in cal.walk(): if component.name != "VEVENT": continue summary = str(component.get("SUMMARY", "No title")) description = str(component.get("DESCRIPTION", "")) if component.get("DESCRIPTION") else "" location = str(component.get("LOCATION", "")) if component.get("LOCATION") else "" uid = str(component.get("UID", "")) # Get start and end times dtstart = component.get("DTSTART") dtend = component.get("DTEND") if not dtstart: continue dtstart_val = dtstart.dt dtend_val = dtend.dt if dtend else None # Handle all-day events (date vs datetime) is_all_day = not isinstance(dtstart_val, datetime) if is_all_day: # All-day event - convert date to datetime dtstart_val = datetime.combine(dtstart_val, datetime.min.time(), tzinfo=tz) if dtend_val and not isinstance(dtend_val, datetime): dtend_val = datetime.combine(dtend_val, datetime.min.time(), tzinfo=tz) else: # Make sure datetime is timezone-aware if dtstart_val.tzinfo is None: dtstart_val = dtstart_val.replace(tzinfo=tz) else: dtstart_val = dtstart_val.astimezone(tz) if dtend_val: if dtend_val.tzinfo is None: dtend_val = dtend_val.replace(tzinfo=tz) else: dtend_val = dtend_val.astimezone(tz) # Check for recurring events rrule = component.get("RRULE") if rrule: # Handle recurring events try: rrule_str = rrule.to_ical().decode("utf-8") rule = rrulestr(rrule_str, dtstart=dtstart_val) # Get occurrences within our window occurrences = list(rule.between(now, cutoff, inc=True))[:10] # Limit to 10 occurrences for occ in occurrences: if occ.tzinfo is None: occ = occ.replace(tzinfo=tz) # Calculate end time for this occurrence if dtend_val and dtstart_val: duration = dtend_val - dtstart_val occ_end = occ + duration else: occ_end = None events.append({ "title": summary, "description": description, "location": location, "calendar": calendar_name, "start": occ.isoformat(), "start_unix": int(occ.timestamp()), "start_date": occ.strftime("%b %d"), "start_date_relative": _format_relative_date(occ, tz), "start_time": occ.strftime("%H:%M"), "start_weekday": occ.strftime("%a"), "end": occ_end.isoformat() if occ_end else None, "end_unix": int(occ_end.timestamp()) if occ_end else None, "is_all_day": is_all_day, "uid": uid }) except Exception as e: print(f"Error parsing RRULE for event '{summary}': {e}") # Fall back to single event if now <= dtstart_val <= cutoff: events.append({ "title": summary, "description": description, "location": location, "calendar": calendar_name, "start": dtstart_val.isoformat(), "start_unix": int(dtstart_val.timestamp()), "start_date": dtstart_val.strftime("%b %d"), "start_date_relative": _format_relative_date(dtstart_val, tz), "start_time": dtstart_val.strftime("%H:%M"), "start_weekday": dtstart_val.strftime("%a"), "end": dtend_val.isoformat() if dtend_val else None, "end_unix": int(dtend_val.timestamp()) if dtend_val else None, "is_all_day": is_all_day, "uid": uid }) else: # Single event if now <= dtstart_val <= cutoff: events.append({ "title": summary, "description": description, "location": location, "calendar": calendar_name, "start": dtstart_val.isoformat(), "start_unix": int(dtstart_val.timestamp()), "start_date": dtstart_val.strftime("%b %d"), "start_date_relative": _format_relative_date(dtstart_val, tz), "start_time": dtstart_val.strftime("%H:%M"), "start_weekday": dtstart_val.strftime("%a"), "end": dtend_val.isoformat() if dtend_val else None, "end_unix": int(dtend_val.timestamp()) if dtend_val else None, "is_all_day": is_all_day, "uid": uid }) except Exception as e: print(f"Error parsing iCal for {calendar_name}: {e}") return events @APP.get("/calendar/events") def calendar_events_api( count: int = Query(default=5, ge=1, le=50, description="Number of events to return"), days: int = Query(default=30, ge=1, le=365, description="Days to look ahead"), calendars: str = Query(default="", description="Comma-separated list of calendar names to include (empty = all)") ): """ Returns upcoming calendar events from configured iCal feeds. Merges multiple calendars and sorts by start time. Use 'calendars' parameter to filter specific calendars (e.g., calendars=Családi,Órák) """ all_calendars = _parse_ical_urls() if not all_calendars: return Response( content=json.dumps({ "error": "No calendars configured", "events": [], "count": 0, "fetched_at_unix": int(time.time()) }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) # Filter calendars if specified if calendars.strip(): requested = [c.strip() for c in calendars.split(",") if c.strip()] filtered_calendars = {k: v for k, v in all_calendars.items() if k in requested} else: filtered_calendars = all_calendars all_events = [] calendar_status = {} for name, url in filtered_calendars.items(): ical_text = _fetch_ical(url) if ical_text: events = _parse_ical_events(ical_text, name, days) all_events.extend(events) calendar_status[name] = {"status": "ok", "events": len(events)} else: calendar_status[name] = {"status": "error", "events": 0} # Sort by start time and limit all_events.sort(key=lambda e: e["start_unix"]) limited_events = all_events[:count] return Response( content=json.dumps({ "events": limited_events, "count": len(limited_events), "total_available": len(all_events), "calendars": calendar_status, "fetched_at_unix": int(time.time()) }, ensure_ascii=False), media_type="application/json; charset=utf-8" ) --- apiVersion: apps/v1 kind: Deployment metadata: annotations: argocd.argoproj.io/tracking-id: glance:apps/Deployment:glance-system/glance-helper reloader.stakater.com/auto: "true" name: glance-helper namespace: glance-system spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: app: glance-helper strategy: type: Recreate template: metadata: labels: app: glance-helper app.kubernetes.io/name: glance-helper spec: containers: - args: - |- set -eux; export DEBIAN_FRONTEND=noninteractive; apt-get update; apt-get install -y --no-install-recommends curl ca-certificates iputils-ping dnsutils tzdata net-tools; rm -rf /var/lib/apt/lists/*; pip install --no-cache-dir fastapi uvicorn requests beautifulsoup4 prometheus-client icalendar python-dateutil; python -c "import uvicorn; uvicorn.run('app:APP', host='0.0.0.0', port=8000)" command: - /bin/sh - -lc env: - name: IDOKEP_URL value: https://www.idokep.hu/idojaras/Budapest%20VII.%20ker - name: PLACE_NAME value: Budapest VII. ker - name: TANDOOR_INTERNAL_URL value: http://tandoor.tandoor-system.svc.cluster.local:8080 - name: TANDOOR_PUBLIC_URL value: https://tandoor.dooplex.hu - name: TANDOOR_TOKEN value: tda_8a8b169c_5d1f_4962_83a2_0f2719c7d61a - name: GLANCE_HELPER_PUBLIC_URL value: https://glance-helper.dooplex.hu - name: DATA_DIR value: /data - name: GLANCE_HELPER_KEY value: oplQqnLnJK2vErRVYJpvVUcSDBOSbCHZSbsYY2bwSifgTMfT - name: TZ value: Europe/Budapest # Version Checker configuration - name: VERSION_CHECKER_URL value: http://version-checker.version-checker-system.svc.cluster.local:8080/metrics - name: VERSION_CHECKER_EXCLUDE_IMAGES value: "(^|.*/)(busybox|redis|alpine|nginx|mariadb|mysql|valkey)$|^longhornio.*$|(^|.*/)postgres.*$|^registry\\.k8s\\.io/ingress-nginx/.*$" # Calendar iCal URLs (JSON object: {"name": "url", ...}) - name: CALENDAR_ICAL_URLS value: '{"Órák": "https://calendar.google.com/calendar/ical/b2884faf3db792ac082a6206057552c79080716efd5f966e169a41fc500e1c1c%40group.calendar.google.com/private-0998d8053909ba4449c2f0a6409ce3de/basic.ics", "Családi": "https://calendar.google.com/calendar/ical/nitq3l0if4gn54k438obat5ia0%40group.calendar.google.com/private-59afcf70fee1a798ec369b86d9883b46/basic.ics"}' image: python:3.12-bookworm imagePullPolicy: IfNotPresent name: glance-helper ports: - containerPort: 8000 protocol: TCP resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /app name: app - mountPath: /data name: data workingDir: /app dnsPolicy: ClusterFirst restartPolicy: Always schedulerName: default-scheduler securityContext: {} terminationGracePeriodSeconds: 30 volumes: - configMap: defaultMode: 420 name: glance-helper-app name: app - name: data persistentVolumeClaim: claimName: glance-helper-data --- apiVersion: v1 kind: Service metadata: name: glance-helper namespace: glance-system spec: selector: app.kubernetes.io/name: glance-helper ports: - name: http port: 8000 targetPort: 8000 --- apiVersion: v1 kind: Service metadata: name: idokep-scraper namespace: glance-system spec: selector: app.kubernetes.io/name: glance-helper ports: - name: http port: 8000 targetPort: 8000 --- apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: glance-helper namespace: glance-system annotations: cert-manager.io/cluster-issuer: letsencrypt-prod external-dns.alpha.kubernetes.io/hostname: glance-helper.dooplex.hu nginx.ingress.kubernetes.io/ssl-redirect: "true" spec: ingressClassName: nginx-internal rules: - host: glance-helper.dooplex.hu http: paths: - path: / pathType: Prefix backend: service: name: glance-helper port: number: 8000 tls: - hosts: - glance-helper.dooplex.hu secretName: glance-helper-tls