Files
homelab-manifests/glance-system/glance-helper.yaml
T
2026-02-01 21:06:19 +01:00

1403 lines
52 KiB
YAML

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: glance-helper-data
namespace: glance-system
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 200Mi
---
apiVersion: v1
kind: ConfigMap
metadata:
name: glance-helper-app
namespace: glance-system
data:
app.py: |-
import os
import time
import re
from typing import List, Dict, Any, Optional
import requests
from bs4 import BeautifulSoup
from fastapi import FastAPI, Response
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
APP = FastAPI()
# Add CORS middleware to allow cross-origin requests from Glance dashboards
APP.add_middleware(
CORSMiddleware,
allow_origins=["https://kisfenyo.dooplex.hu", "https://orsi.dooplex.hu", "https://glance-helper.dooplex.hu"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
)
# ================================
# Időkép configuration
# ================================
IDOKEP_URL = os.getenv("IDOKEP_URL", "https://www.idokep.hu/idojaras/Budapest%20VII.%20ker")
PLACE_NAME = os.getenv("PLACE_NAME", "Budapest VII. ker")
SOURCE_NAME = "Időkép"
UA = os.getenv("USER_AGENT", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome Safari")
SCRAPES = Counter("idokep_scrapes_total", "Total Időkép scrapes", ["place", "status"])
SCRAPE_SECONDS = Histogram("idokep_scrape_seconds", "Időkép scrape duration in seconds", ["place"])
ICON_CONDITIONS = {
# Daytime (0xx)
"010": "Derült",
"011": "Pára",
"021": "Gyengén felhős",
"022": "Közepesen felhős",
"023": "Erősen felhős",
"030": "Borult",
"040": "Ködszitálás",
"041": "Szitálás",
"042": "Gyenge eső",
"043": "Eső",
"043s": "Eső viharos széllel",
"051": "Havas eső",
"052": "Ónos eső",
"061": "Havazás",
"062": "Havazás",
"081": "Zápor",
"083": "Hózápor",
"088": "Jégeső",
"090": "Zivatar",
"092": "Száraz zivatar",
"100": "Köd",
"101": "Porvihar",
# Nighttime (3xx) - same conditions
"310": "Derült",
"311": "Pára",
"321": "Gyengén felhős",
"322": "Közepesen felhős",
"323": "Erősen felhős",
"330": "Borult",
"340": "Ködszitálás",
"341": "Szitálás",
"342": "Gyenge eső",
"343": "Eső",
"343s": "Eső viharos széllel",
"351": "Havas eső",
"352": "Ónos eső",
"361": "Havazás",
"362": "Havazás",
"381": "Zápor",
"383": "Hózápor",
"388": "Jégeső",
"390": "Zivatar",
"392": "Száraz zivatar",
"400": "Köd",
"401": "Porvihar",
}
def _icon_condition(icon_url: str) -> str:
"""Extract weather condition from icon URL."""
if not icon_url:
return ""
# URL format: https://www.idokep.hu/assets/forecast-icons/XXX.svg (or .png)
import re
match = re.search(r'/(\d{3}s?)\.(svg|png)', icon_url)
if match:
code = match.group(1)
return ICON_CONDITIONS.get(code, "")
return ""
def _abs_url(url): return "https://www.idokep.hu" + url if url and not url.startswith("http") else url
def _to_float(s):
try: return float(s.strip().replace("˚C", "").replace("°C", "").replace("°", "").replace(",", "."))
except: return None
def _calculate_gradient_data(daily_items):
if not daily_items: return daily_items
# 1. Global Min/Max for the week
mins = [d["tmin_c"] for d in daily_items if d["tmin_c"] is not None]
maxs = [d["tmax_c"] for d in daily_items if d["tmax_c"] is not None]
if not mins or not maxs: return daily_items
g_min = min(mins) - 1.0
g_max = max(maxs) + 1.0
span = g_max - g_min
if span <= 0: span = 1.0
# 2. Helper to get percentage
def get_pct(t): return (t - g_min) / span * 100.0
# 3. Calculate stops (floats)
s_wht = get_pct(-10)
s_blu = get_pct(0)
s_pur = get_pct(15)
s_pnk = get_pct(25)
s_red = get_pct(35)
# 4. Inject individual floats into the dictionary
for d in daily_items:
tmin, tmax = d["tmin_c"], d["tmax_c"]
if tmin is None or tmax is None: continue
w_pct = (tmax - tmin) / span * 100.0
if w_pct < 2: w_pct = 2
l_pct = (tmin - g_min) / span * 100.0
inner_w = (100.0 / w_pct) * 100.0
inner_ml = -(l_pct / w_pct) * 100.0
# STORE AS FLOATS (No "%" symbol here)
d["c_l"] = l_pct
d["c_w"] = w_pct
d["c_gw"] = inner_w
d["c_ml"] = inner_ml
d["c_s1"] = s_wht # White
d["c_s2"] = s_blu # Blue
d["c_s3"] = s_pur # Purple
d["c_s4"] = s_pnk # Pink
d["c_s5"] = s_red # Red
return daily_items
def scrape() -> Dict[str, Any]:
try:
r = requests.get(IDOKEP_URL, headers={"User-Agent": UA}, timeout=15)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
# Current
cur_temp_el = soup.select_one(".current-temperature")
cur_cond_el = soup.select_one(".current-weather")
cur_icon_el = soup.select_one(".forecast-bigicon")
# Hourly
hourly = []
for card in soup.select(".ik.hourly-forecast-card")[:8]:
t = card.select_one(".ik.hourly-forecast-hour")
temp = card.select_one(".ik.temperature-circled")
icon = card.select_one("img.ik.forecast-icon")
if t and temp:
hourly.append({
"time": t.get_text(strip=True),
"temp_c": _to_float(temp.get_text(strip=True)),
"icon_url": _abs_url(icon.get("src")),
"condition": _icon_condition(_abs_url(icon.get("src")))
})
# Daily
daily = []
for col in soup.select(".ik.daily-forecast-container .ik.dailyForecastCol")[:15]:
dow = col.select_one(".ik.dfDay")
daynum = col.select_one(".ik.dfDayNum")
icon = col.select_one("img.ik.forecast-icon")
tmax = col.select_one("div.ik.max")
tmin = col.select_one("div.ik.min")
# Fallback for holiday layout
v_tmax, v_tmin = None, None
if tmax and tmin:
v_tmax = _to_float(tmax.get_text(strip=True))
v_tmin = _to_float(tmin.get_text(strip=True))
else:
vals = [a.get_text(strip=True) for a in col.select(".ik.min-max-container a")]
vals = [v for v in vals if re.fullmatch(r"-?\d+", v)]
if len(vals) >= 2:
v_tmax, v_tmin = _to_float(vals[0]), _to_float(vals[1])
if v_tmax is not None and v_tmin is not None:
icon_url = _abs_url(icon.get("src") if icon else None)
daily.append({
"daynum": daynum.get_text(strip=True) if daynum else "",
"dow": dow.get_text(strip=True) if dow else "",
"tmin_c": v_tmin,
"tmax_c": v_tmax,
"icon_url": _abs_url(icon.get("src") if icon else None),
"condition": _icon_condition(icon_url),
})
daily = _calculate_gradient_data(daily[:5])
return {
"source": {"name": SOURCE_NAME, "url": IDOKEP_URL},
"location": {"name": PLACE_NAME},
"current": {
"temp_c": _to_float(cur_temp_el.get_text(strip=True)) if cur_temp_el else 0,
"condition": cur_cond_el.get_text(strip=True) if cur_cond_el else "",
"icon_url": _abs_url(cur_icon_el.get("src")) if cur_icon_el else ""
},
"hourly": hourly,
"daily": daily,
"fetched_at_unix": int(time.time()),
}
except Exception as e:
print(f"Scrape error: {e}")
raise
@APP.get("/api")
def api():
status = "ok"
try:
data = scrape()
except:
status = "error"
SCRAPES.labels(place=PLACE_NAME, status=status).inc()
raise
SCRAPES.labels(place=PLACE_NAME, status=status).inc()
import json
return Response(content=json.dumps(data, ensure_ascii=False), media_type="application/json; charset=utf-8")
@APP.get("/metrics")
def metrics():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
# ================================
# Tandoor "Meal of the Day" - Enhanced Version
# ================================
from datetime import datetime, timezone, timedelta
from zoneinfo import ZoneInfo
from urllib.parse import urlencode
from fastapi import HTTPException, Query
from fastapi.responses import RedirectResponse
import json
import random
from pathlib import Path
TANDOOR_INTERNAL_URL = os.getenv("TANDOOR_INTERNAL_URL", "").rstrip("/")
TANDOOR_PUBLIC_URL = os.getenv("TANDOOR_PUBLIC_URL", "").rstrip("/")
GLANCE_HELPER_PUBLIC_URL = os.getenv("GLANCE_HELPER_PUBLIC_URL", "").rstrip("/")
GLANCE_HELPER_KEY = os.getenv("GLANCE_HELPER_KEY", "")
DATA_DIR = Path(os.getenv("DATA_DIR", "/data"))
DATA_DIR.mkdir(parents=True, exist_ok=True)
COOKED_PATH = DATA_DIR / "tandoor-cooked.json"
PICKS_PATH = DATA_DIR / "tandoor-picks.json"
# Cooldown: don't suggest recipes cooked within this many days
TANDOOR_COOLDOWN_DAYS = int(os.getenv("TANDOOR_COOLDOWN_DAYS", "14"))
def _today_str() -> str:
"""YYYY-MM-DD in Europe/Budapest if tzdata exists, else UTC."""
try:
tz = ZoneInfo("Europe/Budapest")
return datetime.now(tz).strftime("%Y-%m-%d")
except Exception:
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
def _load_json(path: Path, default):
try:
if path.exists():
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
pass
return default
def _save_json(path: Path, obj) -> None:
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(path)
def _tandoor_headers():
token = os.getenv("TANDOOR_TOKEN", "")
if not token:
raise HTTPException(status_code=500, detail="TANDOOR_TOKEN is not set")
return {"Authorization": f"Bearer {token}", "Accept": "application/json"}
def _rewrite_to_public(url: str) -> str:
"""Turn internal URLs into public ones (images/links)."""
if not url:
return url
if TANDOOR_PUBLIC_URL and TANDOOR_INTERNAL_URL and url.startswith(TANDOOR_INTERNAL_URL):
return TANDOOR_PUBLIC_URL + url[len(TANDOOR_INTERNAL_URL):]
return url
def _fetch_all_recipes() -> list[dict]:
if not TANDOOR_INTERNAL_URL:
raise HTTPException(status_code=500, detail="TANDOOR_INTERNAL_URL is not set")
url = f"{TANDOOR_INTERNAL_URL}/api/recipe/?page_size=200"
out = []
for _ in range(100): # safety
r = requests.get(url, headers=_tandoor_headers(), timeout=15)
if r.status_code != 200:
raise HTTPException(status_code=502, detail=f"Tandoor returned {r.status_code}: {r.text[:200]}")
j = r.json()
results = j.get("results", [])
out.extend(results)
url = j.get("next")
if not url:
break
return out
# ================================
# Cooked History Management
# ================================
# Structure: {"<recipe_id>": ["2026-01-22", "2026-01-15", ...], ...}
def _load_cooked_history() -> dict[str, list[str]]:
"""Load cooked history. Returns {recipe_id_str: [date_str, ...]}"""
return _load_json(COOKED_PATH, {})
def _save_cooked_history(history: dict[str, list[str]]) -> None:
_save_json(COOKED_PATH, history)
def _get_cooked_count(history: dict, recipe_id: int) -> int:
"""Get total times a recipe has been cooked."""
return len(history.get(str(recipe_id), []))
def _get_recently_cooked_ids(history: dict, days: int) -> set[int]:
"""Get recipe IDs cooked within the last N days."""
if days <= 0:
return set()
try:
tz = ZoneInfo("Europe/Budapest")
except Exception:
tz = timezone.utc
cutoff = (datetime.now(tz) - timedelta(days=days)).strftime("%Y-%m-%d")
recent = set()
for rid_str, dates in history.items():
for d in dates:
if d >= cutoff:
recent.add(int(rid_str))
break
return recent
def _record_cooked(recipe_id: int) -> dict[str, list[str]]:
"""Record that a recipe was cooked today. Returns updated history."""
history = _load_cooked_history()
rid_str = str(recipe_id)
today = _today_str()
if rid_str not in history:
history[rid_str] = []
# Avoid duplicate entries for same day
if today not in history[rid_str]:
history[rid_str].append(today)
# Keep sorted (newest first) for easier reading
history[rid_str] = sorted(history[rid_str], reverse=True)
_save_cooked_history(history)
return history
# ================================
# Daily Picks Logic
# ================================
def _compute_daily_picks(count: int, cooldown_days: int) -> dict:
today = _today_str()
history = _load_cooked_history()
recently_cooked = _get_recently_cooked_ids(history, cooldown_days)
picks_doc = _load_json(PICKS_PATH, {})
# Check if we need to regenerate picks
needs_regen = False
if picks_doc.get("date") != today:
needs_regen = True
elif picks_doc.get("count") != count:
needs_regen = True
elif picks_doc.get("cooldown") != cooldown_days:
needs_regen = True
else:
# Check if we can add more picks (new recipes added mid-day)
current_ids = set(picks_doc.get("ids", []))
if len(current_ids) < count:
recipes = _fetch_all_recipes()
available = [r.get("id") for r in recipes
if r.get("id") is not None
and r.get("id") not in current_ids
and r.get("id") not in recently_cooked]
if available:
needs_regen = True
if not needs_regen:
return picks_doc
# Generate new picks
recipes = _fetch_all_recipes()
available = [r for r in recipes
if r.get("id") is not None
and r.get("id") not in recently_cooked]
if not available:
# Fallback: if everything is in cooldown, ignore cooldown
available = recipes[:]
chosen = random.sample(available, k=min(count, len(available))) if available else []
picks_doc = {
"date": today,
"count": count,
"cooldown": cooldown_days,
"ids": [r.get("id") for r in chosen if r.get("id") is not None]
}
_save_json(PICKS_PATH, picks_doc)
return picks_doc
def _build_items_from_ids(ids: list[int], history: dict) -> tuple[list[dict], int]:
recipes = _fetch_all_recipes()
by_id = {r.get("id"): r for r in recipes if r.get("id") is not None}
items = []
for rid in ids:
r = by_id.get(rid)
if not r:
continue
img = _rewrite_to_public(r.get("image") or "")
url = f"{TANDOOR_PUBLIC_URL}/recipe/{rid}" if TANDOOR_PUBLIC_URL else ""
cook_params = {"id": rid}
if GLANCE_HELPER_KEY:
cook_params["key"] = GLANCE_HELPER_KEY
cook_url = f"{GLANCE_HELPER_PUBLIC_URL}/tandoor/cook?{urlencode(cook_params)}" if GLANCE_HELPER_PUBLIC_URL else ""
items.append({
"id": rid,
"name": r.get("name") or "",
"image": img,
"url": url,
"cook_url": cook_url,
"cooked_count": _get_cooked_count(history, rid)
})
return items, len(recipes)
@APP.get("/tandoor/daily")
def tandoor_daily(count: int = Query(3, ge=1, le=10), cooldown: int = Query(None, ge=0, le=365)):
"""
Get daily meal suggestions.
- count: Number of suggestions (1-10, default 3)
- cooldown: Days to exclude recently cooked meals (0-365, default from env TANDOOR_COOLDOWN_DAYS or 14)
"""
cooldown_days = cooldown if cooldown is not None else TANDOOR_COOLDOWN_DAYS
picks_doc = _compute_daily_picks(count, cooldown_days)
ids = picks_doc.get("ids", [])
history = _load_cooked_history()
items, total = _build_items_from_ids(ids, history)
return {
"date": picks_doc.get("date"),
"total_recipes": total,
"cooldown_days": cooldown_days,
"items": items
}
@APP.get("/tandoor/cook")
def tandoor_cook(id: int, key: str | None = None, redirect: str | None = None):
"""Mark a recipe as cooked today."""
if GLANCE_HELPER_KEY and key != GLANCE_HELPER_KEY:
raise HTTPException(status_code=403, detail="Invalid key")
today = _today_str()
# Record in permanent history
history = _record_cooked(id)
recently_cooked = _get_recently_cooked_ids(history, TANDOOR_COOLDOWN_DAYS)
# Remove from today's picks and try to refill
picks = _load_json(PICKS_PATH, {})
if picks.get("date") == today and isinstance(picks.get("ids"), list):
ids = [x for x in picks["ids"] if x != id]
target = int(picks.get("count") or len(ids))
if len(ids) < target:
recipes = _fetch_all_recipes()
avoid = set(ids) | recently_cooked
candidates = [r.get("id") for r in recipes
if r.get("id") is not None
and r.get("id") not in avoid]
if candidates:
ids.append(random.choice(candidates))
picks["ids"] = ids[:target]
_save_json(PICKS_PATH, picks)
if redirect:
return RedirectResponse(url=redirect, status_code=302)
return {
"ok": True,
"date": today,
"cooked_id": id,
"cooked_total": _get_cooked_count(history, id)
}
@APP.get("/tandoor/history")
def tandoor_history():
"""Get full cooked history (for debugging/stats)."""
history = _load_cooked_history()
return {
"recipes": {
int(k): {"dates": v, "count": len(v)}
for k, v in history.items()
},
"total_cooks": sum(len(v) for v in history.values())
}
# ================================
# Version Checker - Container Version Monitoring
# ================================
# Scrapes version-checker Prometheus metrics, parses versions,
# and returns properly sorted results with the NEWEST available version.
VERSION_CHECKER_URL = os.getenv("VERSION_CHECKER_URL", "http://version-checker.version-checker-system.svc.cluster.local:8080/metrics")
# Regex patterns for filtering (same as in Glance widget)
VERSION_CHECKER_EXCLUDE_IMAGES = os.getenv("VERSION_CHECKER_EXCLUDE_IMAGES",
r"(^|.*/)(busybox|redis|alpine|nginx|mariadb|mysql|valkey)$|^longhornio.*$|(^|.*/)postgres.*$|^registry\.k8s\.io/ingress-nginx/.*$"
)
def _parse_semver(version: str) -> tuple:
"""
Parse version string into comparable tuple.
Handles: v1.2.3, 1.2.3, v1.2.3-beta, 1.2.3-20251030, etc.
Returns tuple that sorts correctly: (major, minor, patch, prerelease_flag, prerelease)
"""
if not version:
return (0, 0, 0, 0, "")
# Skip sha256 digests entirely
if "sha256:" in version or version.startswith("sha"):
return (0, 0, 0, 0, version)
# Remove 'v' prefix
v = version.lstrip("v")
# Remove @sha256:... suffix if present (e.g., "1.2.3@sha256:abc...")
if "@" in v:
v = v.split("@")[0]
# Split into version and prerelease/build metadata
# Handle: 1.2.3-beta, 1.2.3-20251030, 1.2.3+build
prerelease = ""
if "-" in v:
parts = v.split("-", 1)
v = parts[0]
prerelease = parts[1]
elif "+" in v:
parts = v.split("+", 1)
v = parts[0]
prerelease = parts[1]
# Parse major.minor.patch
segments = v.split(".")
try:
major = int(segments[0]) if len(segments) > 0 and segments[0].isdigit() else 0
minor = int(segments[1]) if len(segments) > 1 and segments[1].isdigit() else 0
patch = int(segments[2]) if len(segments) > 2 and segments[2].isdigit() else 0
except (ValueError, IndexError):
major, minor, patch = 0, 0, 0
# For sorting: no prerelease > prerelease (1.2.3 > 1.2.3-beta)
# prerelease_flag: 1 = no prerelease (higher), 0 = has prerelease (lower)
prerelease_flag = 1 if not prerelease else 0
return (major, minor, patch, prerelease_flag, prerelease)
def _compare_versions(v1: str, v2: str) -> int:
"""Compare two version strings. Returns: -1 if v1<v2, 0 if equal, 1 if v1>v2"""
t1 = _parse_semver(v1)
t2 = _parse_semver(v2)
if t1 < t2:
return -1
elif t1 > t2:
return 1
return 0
def _parse_prometheus_metrics(text: str) -> list[dict]:
"""
Parse Prometheus metrics text format.
Extracts version_checker_is_latest_version metrics.
"""
results = []
pattern = re.compile(
r'version_checker_is_latest_version\{([^}]+)\}\s+(\d+(?:\.\d+)?)'
)
for match in pattern.finditer(text):
labels_str = match.group(1)
value = float(match.group(2))
# Parse labels
labels = {}
# Handle labels like: container="foo",image="bar",current_version="1.0"
label_pattern = re.compile(r'(\w+)="([^"]*)"')
for label_match in label_pattern.finditer(labels_str):
labels[label_match.group(1)] = label_match.group(2)
results.append({
"labels": labels,
"value": value # 1 = up to date, 0 = outdated
})
return results
def _fetch_version_checker_data() -> dict:
"""
Fetch and process version-checker metrics.
Returns structured data with deduplicated images and newest versions.
"""
try:
r = requests.get(VERSION_CHECKER_URL, timeout=30)
r.raise_for_status()
metrics = _parse_prometheus_metrics(r.text)
except Exception as e:
print(f"Version checker fetch error: {e}")
raise HTTPException(status_code=502, detail=f"Failed to fetch version-checker: {e}")
# Build exclude pattern
exclude_pattern = re.compile(VERSION_CHECKER_EXCLUDE_IMAGES) if VERSION_CHECKER_EXCLUDE_IMAGES else None
# Group metrics by image
# Structure: {image: {"current": str, "latest_versions": [str], "is_latest": bool}}
images: Dict[str, Dict] = {}
for metric in metrics:
labels = metric["labels"]
container_type = labels.get("container_type", "")
image = labels.get("image", "")
current = labels.get("current_version", "")
latest = labels.get("latest_version", "")
is_latest = metric["value"] == 1
# Filter: only containers (not init containers for simplicity)
if container_type != "container":
continue
# Filter: exclude patterns
if exclude_pattern and exclude_pattern.search(image):
continue
# Filter: skip sha256 versions (can't compare meaningfully)
if "sha256:" in current or "sha256:" in latest:
continue
# Initialize or update image entry
if image not in images:
images[image] = {
"current_version": current,
"latest_versions": [],
"is_latest": True
}
# Track all latest versions reported (for outdated images)
if not is_latest:
images[image]["is_latest"] = False
if latest and latest not in images[image]["latest_versions"]:
images[image]["latest_versions"].append(latest)
# Process: find the NEWEST version for each image
up_to_date = []
outdated = []
for image, data in images.items():
current = data["current_version"]
if data["is_latest"]:
up_to_date.append({
"image": image,
"current_version": current,
"latest_version": current
})
else:
# Sort versions and pick the newest
versions = data["latest_versions"]
if versions:
versions_sorted = sorted(versions, key=lambda v: _parse_semver(v), reverse=True)
newest = versions_sorted[0]
else:
newest = current
outdated.append({
"image": image,
"current_version": current,
"latest_version": newest,
"all_newer_versions": len(versions)
})
# Sort outdated by image name for consistent ordering
outdated.sort(key=lambda x: x["image"])
up_to_date.sort(key=lambda x: x["image"])
return {
"fetched_at_unix": int(time.time()),
"summary": {
"up_to_date": len(up_to_date),
"outdated": len(outdated),
"total": len(up_to_date) + len(outdated)
},
"outdated": outdated,
"up_to_date": up_to_date
}
@APP.get("/versions")
def versions_api():
"""
Returns container version status with properly sorted newest versions.
Response format optimized for Glance custom-api widget.
"""
data = _fetch_version_checker_data()
return Response(
content=json.dumps(data, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
@APP.get("/versions/outdated")
def versions_outdated():
"""Returns only outdated images (for simpler widget)."""
data = _fetch_version_checker_data()
return Response(
content=json.dumps({
"fetched_at_unix": data["fetched_at_unix"],
"summary": data["summary"],
"images": data["outdated"]
}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
# ================================
# Google Calendar iCal Integration
# ================================
from icalendar import Calendar as ICalCalendar
from dateutil.rrule import rrulestr
from dateutil import parser as dateutil_parser
# Calendar configuration - loaded from environment
CALENDAR_ICAL_URLS = os.getenv("CALENDAR_ICAL_URLS", "") # JSON: {"name": "url", ...}
def _format_relative_date(dt: datetime, tz) -> str:
"""Format date as 'Today', 'Tomorrow', or 'Mon 24' style."""
now = datetime.now(tz)
today = now.date()
tomorrow = today + timedelta(days=1)
event_date = dt.date()
if event_date == today:
return "Today"
elif event_date == tomorrow:
return "Tomorrow"
else:
return dt.strftime("%b %d")
def _parse_ical_urls() -> dict[str, str]:
"""Parse CALENDAR_ICAL_URLS environment variable."""
if not CALENDAR_ICAL_URLS:
return {}
try:
return json.loads(CALENDAR_ICAL_URLS)
except Exception as e:
print(f"Error parsing CALENDAR_ICAL_URLS: {e}")
return {}
def _fetch_ical(url: str, timeout: int = 15) -> str:
"""Fetch iCal content from URL."""
try:
r = requests.get(url, timeout=timeout, headers={"User-Agent": UA})
r.raise_for_status()
return r.text
except Exception as e:
print(f"Error fetching iCal from {url}: {e}")
return ""
def _parse_ical_events(ical_text: str, calendar_name: str, days_ahead: int = 30) -> list[dict]:
"""
Parse iCal text and return upcoming events.
Handles both single events and recurring events.
"""
events = []
if not ical_text:
return events
try:
tz = ZoneInfo("Europe/Budapest")
except Exception:
tz = timezone.utc
now = datetime.now(tz)
cutoff = now + timedelta(days=days_ahead)
try:
cal = ICalCalendar.from_ical(ical_text)
for component in cal.walk():
if component.name != "VEVENT":
continue
summary = str(component.get("SUMMARY", "No title"))
description = str(component.get("DESCRIPTION", "")) if component.get("DESCRIPTION") else ""
location = str(component.get("LOCATION", "")) if component.get("LOCATION") else ""
uid = str(component.get("UID", ""))
# Get start and end times
dtstart = component.get("DTSTART")
dtend = component.get("DTEND")
if not dtstart:
continue
dtstart_val = dtstart.dt
dtend_val = dtend.dt if dtend else None
# Handle all-day events (date vs datetime)
is_all_day = not isinstance(dtstart_val, datetime)
if is_all_day:
# All-day event - convert date to datetime
dtstart_val = datetime.combine(dtstart_val, datetime.min.time(), tzinfo=tz)
if dtend_val and not isinstance(dtend_val, datetime):
dtend_val = datetime.combine(dtend_val, datetime.min.time(), tzinfo=tz)
else:
# Make sure datetime is timezone-aware
if dtstart_val.tzinfo is None:
dtstart_val = dtstart_val.replace(tzinfo=tz)
else:
dtstart_val = dtstart_val.astimezone(tz)
if dtend_val:
if dtend_val.tzinfo is None:
dtend_val = dtend_val.replace(tzinfo=tz)
else:
dtend_val = dtend_val.astimezone(tz)
# Check for recurring events
rrule = component.get("RRULE")
if rrule:
# Handle recurring events
try:
rrule_str = rrule.to_ical().decode("utf-8")
rule = rrulestr(rrule_str, dtstart=dtstart_val)
# Get occurrences within our window
occurrences = list(rule.between(now, cutoff, inc=True))[:10] # Limit to 10 occurrences
for occ in occurrences:
if occ.tzinfo is None:
occ = occ.replace(tzinfo=tz)
# Calculate end time for this occurrence
if dtend_val and dtstart_val:
duration = dtend_val - dtstart_val
occ_end = occ + duration
else:
occ_end = None
events.append({
"title": summary,
"description": description,
"location": location,
"calendar": calendar_name,
"start": occ.isoformat(),
"start_unix": int(occ.timestamp()),
"start_date": occ.strftime("%b %d"),
"start_date_relative": _format_relative_date(occ, tz),
"start_time": occ.strftime("%H:%M"),
"start_weekday": occ.strftime("%a"),
"end": occ_end.isoformat() if occ_end else None,
"end_unix": int(occ_end.timestamp()) if occ_end else None,
"is_all_day": is_all_day,
"uid": uid
})
except Exception as e:
print(f"Error parsing RRULE for event '{summary}': {e}")
# Fall back to single event
if now <= dtstart_val <= cutoff:
events.append({
"title": summary,
"description": description,
"location": location,
"calendar": calendar_name,
"start": dtstart_val.isoformat(),
"start_unix": int(dtstart_val.timestamp()),
"start_date": dtstart_val.strftime("%b %d"),
"start_date_relative": _format_relative_date(dtstart_val, tz),
"start_time": dtstart_val.strftime("%H:%M"),
"start_weekday": dtstart_val.strftime("%a"),
"end": dtend_val.isoformat() if dtend_val else None,
"end_unix": int(dtend_val.timestamp()) if dtend_val else None,
"is_all_day": is_all_day,
"uid": uid
})
else:
# Single event
if now <= dtstart_val <= cutoff:
events.append({
"title": summary,
"description": description,
"location": location,
"calendar": calendar_name,
"start": dtstart_val.isoformat(),
"start_unix": int(dtstart_val.timestamp()),
"start_date": dtstart_val.strftime("%b %d"),
"start_date_relative": _format_relative_date(dtstart_val, tz),
"start_time": dtstart_val.strftime("%H:%M"),
"start_weekday": dtstart_val.strftime("%a"),
"end": dtend_val.isoformat() if dtend_val else None,
"end_unix": int(dtend_val.timestamp()) if dtend_val else None,
"is_all_day": is_all_day,
"uid": uid
})
except Exception as e:
print(f"Error parsing iCal for {calendar_name}: {e}")
return events
@APP.get("/calendar/events")
def calendar_events_api(
count: int = Query(default=5, ge=1, le=50, description="Number of events to return"),
days: int = Query(default=30, ge=1, le=365, description="Days to look ahead"),
calendars: str = Query(default="", description="Comma-separated list of calendar names to include (empty = all)")
):
"""
Returns upcoming calendar events from configured iCal feeds.
Merges multiple calendars and sorts by start time.
Use 'calendars' parameter to filter specific calendars (e.g., calendars=Családi,Órák)
"""
all_calendars = _parse_ical_urls()
if not all_calendars:
return Response(
content=json.dumps({
"error": "No calendars configured",
"events": [],
"count": 0,
"fetched_at_unix": int(time.time())
}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
# Filter calendars if specified
if calendars.strip():
requested = [c.strip() for c in calendars.split(",") if c.strip()]
filtered_calendars = {k: v for k, v in all_calendars.items() if k in requested}
else:
filtered_calendars = all_calendars
all_events = []
calendar_status = {}
for name, url in filtered_calendars.items():
ical_text = _fetch_ical(url)
if ical_text:
events = _parse_ical_events(ical_text, name, days)
all_events.extend(events)
calendar_status[name] = {"status": "ok", "events": len(events)}
else:
calendar_status[name] = {"status": "error", "events": 0}
# Sort by start time and limit
all_events.sort(key=lambda e: e["start_unix"])
limited_events = all_events[:count]
return Response(
content=json.dumps({
"events": limited_events,
"count": len(limited_events),
"total_available": len(all_events),
"calendars": calendar_status,
"fetched_at_unix": int(time.time())
}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
# ================================
# User Data Management (Notes, Todos, Motivation)
# ================================
# File structure per user: /data/userdata-{username}.json
# {
# "notes": "free text...",
# "todos": [{"id": "uuid", "text": "...", "done": false, "created": "..."}],
# "motivation": ["quote1", "quote2", ...]
# }
import uuid as uuid_lib
from pydantic import BaseModel
from typing import Optional, List
class TodoItem(BaseModel):
text: str
done: bool = False
class MotivationItem(BaseModel):
text: str
class NotesUpdate(BaseModel):
content: str
def _userdata_path(user: str) -> Path:
"""Get the path to user data file."""
# Sanitize username to prevent path traversal
safe_user = re.sub(r'[^a-zA-Z0-9_-]', '', user)
if not safe_user:
raise HTTPException(status_code=400, detail="Invalid username")
return DATA_DIR / f"userdata-{safe_user}.json"
def _load_userdata(user: str) -> dict:
"""Load user data or return default structure."""
default = {
"notes": "",
"todos": [],
"motivation": [
"Believe in yourself!",
"Every day is a new opportunity.",
"You've got this!",
"Small steps lead to big changes.",
"Stay focused, stay positive."
]
}
path = _userdata_path(user)
data = _load_json(path, default)
# Ensure all keys exist
for key in default:
if key not in data:
data[key] = default[key]
return data
def _save_userdata(user: str, data: dict) -> None:
"""Save user data to file."""
path = _userdata_path(user)
_save_json(path, data)
def _verify_key(key: str = Query(default="")):
"""Verify API key for write operations."""
if not GLANCE_HELPER_KEY:
return True # No key configured, allow all
if key != GLANCE_HELPER_KEY:
raise HTTPException(status_code=403, detail="Invalid API key")
return True
# ========== GET ALL USER DATA ==========
@APP.get("/userdata/{user}")
def get_userdata(user: str):
"""Get all user data (notes, todos, motivation)."""
data = _load_userdata(user)
return Response(
content=json.dumps(data, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
# ========== NOTES ==========
@APP.get("/userdata/{user}/notes")
def get_notes(user: str):
"""Get user notes."""
data = _load_userdata(user)
return Response(
content=json.dumps({"notes": data.get("notes", "")}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
@APP.post("/userdata/{user}/notes")
def update_notes(user: str, body: NotesUpdate, key: str = Query(default="")):
"""Update user notes."""
_verify_key(key)
data = _load_userdata(user)
data["notes"] = body.content
_save_userdata(user, data)
return Response(
content=json.dumps({"success": True, "notes": data["notes"]}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
# ========== TODOS ==========
@APP.get("/userdata/{user}/todos")
def get_todos(user: str):
"""Get user todos."""
data = _load_userdata(user)
return Response(
content=json.dumps({
"todos": data.get("todos", []),
"count": len(data.get("todos", []))
}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
@APP.post("/userdata/{user}/todos")
def add_todo(user: str, body: TodoItem, key: str = Query(default="")):
"""Add a new todo item."""
_verify_key(key)
data = _load_userdata(user)
new_todo = {
"id": str(uuid_lib.uuid4())[:8],
"text": body.text.strip(),
"done": body.done,
"created": datetime.now(timezone.utc).isoformat()
}
if not data.get("todos"):
data["todos"] = []
data["todos"].append(new_todo)
_save_userdata(user, data)
return Response(
content=json.dumps({"success": True, "todo": new_todo}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
@APP.put("/userdata/{user}/todos/{todo_id}")
def update_todo(user: str, todo_id: str, body: TodoItem, key: str = Query(default="")):
"""Update a todo item (toggle done, update text)."""
_verify_key(key)
data = _load_userdata(user)
for todo in data.get("todos", []):
if todo.get("id") == todo_id:
todo["text"] = body.text.strip()
todo["done"] = body.done
_save_userdata(user, data)
return Response(
content=json.dumps({"success": True, "todo": todo}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
raise HTTPException(status_code=404, detail="Todo not found")
@APP.delete("/userdata/{user}/todos/{todo_id}")
def delete_todo(user: str, todo_id: str, key: str = Query(default="")):
"""Delete a todo item."""
_verify_key(key)
data = _load_userdata(user)
original_count = len(data.get("todos", []))
data["todos"] = [t for t in data.get("todos", []) if t.get("id") != todo_id]
if len(data["todos"]) == original_count:
raise HTTPException(status_code=404, detail="Todo not found")
_save_userdata(user, data)
return Response(
content=json.dumps({"success": True, "deleted": todo_id}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
# ========== MOTIVATION ==========
@APP.get("/userdata/{user}/motivation")
def get_motivation(user: str):
"""Get all motivation quotes."""
data = _load_userdata(user)
quotes = data.get("motivation", [])
return Response(
content=json.dumps({
"quotes": quotes,
"count": len(quotes)
}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
@APP.get("/userdata/{user}/motivation/random")
def get_random_motivation(user: str):
"""Get a random motivation quote."""
data = _load_userdata(user)
quotes = data.get("motivation", [])
if not quotes:
return Response(
content=json.dumps({
"quote": "Add some motivation quotes!",
"index": -1,
"total": 0
}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
idx = random.randint(0, len(quotes) - 1)
return Response(
content=json.dumps({
"quote": quotes[idx],
"index": idx,
"total": len(quotes)
}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
@APP.post("/userdata/{user}/motivation")
def add_motivation(user: str, body: MotivationItem, key: str = Query(default="")):
"""Add a new motivation quote."""
_verify_key(key)
data = _load_userdata(user)
if not data.get("motivation"):
data["motivation"] = []
quote_text = body.text.strip()
if quote_text and quote_text not in data["motivation"]:
data["motivation"].append(quote_text)
_save_userdata(user, data)
return Response(
content=json.dumps({
"success": True,
"quotes": data["motivation"],
"count": len(data["motivation"])
}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
@APP.delete("/userdata/{user}/motivation/{index}")
def delete_motivation(user: str, index: int, key: str = Query(default="")):
"""Delete a motivation quote by index."""
_verify_key(key)
data = _load_userdata(user)
quotes = data.get("motivation", [])
if index < 0 or index >= len(quotes):
raise HTTPException(status_code=404, detail="Quote not found")
deleted = quotes.pop(index)
_save_userdata(user, data)
return Response(
content=json.dumps({
"success": True,
"deleted": deleted,
"quotes": quotes,
"count": len(quotes)
}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
argocd.argoproj.io/tracking-id: glance:apps/Deployment:glance-system/glance-helper
reloader.stakater.com/auto: "true"
name: glance-helper
namespace: glance-system
spec:
progressDeadlineSeconds: 600
replicas: 1
revisionHistoryLimit: 10
selector:
matchLabels:
app: glance-helper
strategy:
type: Recreate
template:
metadata:
labels:
app: glance-helper
app.kubernetes.io/name: glance-helper
spec:
containers:
- args:
- |-
set -eux;
export DEBIAN_FRONTEND=noninteractive;
apt-get update;
apt-get install -y --no-install-recommends curl ca-certificates iputils-ping dnsutils tzdata net-tools;
rm -rf /var/lib/apt/lists/*;
pip install --no-cache-dir fastapi uvicorn requests beautifulsoup4 prometheus-client icalendar python-dateutil pydantic;
python -c "import uvicorn; uvicorn.run('app:APP', host='0.0.0.0', port=8000)"
command:
- /bin/sh
- -lc
env:
- name: IDOKEP_URL
value: https://www.idokep.hu/idojaras/Budapest%20VII.%20ker
- name: PLACE_NAME
value: Budapest VII. ker
- name: TANDOOR_INTERNAL_URL
value: http://tandoor.tandoor-system.svc.cluster.local:8080
- name: TANDOOR_PUBLIC_URL
value: https://tandoor.dooplex.hu
- name: TANDOOR_TOKEN
value: tda_8a8b169c_5d1f_4962_83a2_0f2719c7d61a
- name: GLANCE_HELPER_PUBLIC_URL
value: https://glance-helper.dooplex.hu
- name: DATA_DIR
value: /data
- name: GLANCE_HELPER_KEY
value: oplQqnLnJK2vErRVYJpvVUcSDBOSbCHZSbsYY2bwSifgTMfT
- name: TZ
value: Europe/Budapest
# Version Checker configuration
- name: VERSION_CHECKER_URL
value: http://version-checker.version-checker-system.svc.cluster.local:8080/metrics
- name: VERSION_CHECKER_EXCLUDE_IMAGES
value: "(^|.*/)(busybox|redis|alpine|nginx|mariadb|mysql|valkey)$|^longhornio.*$|(^|.*/)postgres.*$|^registry\\.k8s\\.io/ingress-nginx/.*$"
# Calendar iCal URLs (JSON object: {"name": "url", ...})
- name: CALENDAR_ICAL_URLS
value: '{"Órák": "https://calendar.google.com/calendar/ical/b2884faf3db792ac082a6206057552c79080716efd5f966e169a41fc500e1c1c%40group.calendar.google.com/private-0998d8053909ba4449c2f0a6409ce3de/basic.ics", "Családi": "https://calendar.google.com/calendar/ical/nitq3l0if4gn54k438obat5ia0%40group.calendar.google.com/private-59afcf70fee1a798ec369b86d9883b46/basic.ics"}'
image: python:3.12-bookworm
imagePullPolicy: IfNotPresent
name: glance-helper
ports:
- containerPort: 8000
protocol: TCP
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /app
name: app
- mountPath: /data
name: data
workingDir: /app
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
terminationGracePeriodSeconds: 30
volumes:
- configMap:
defaultMode: 420
name: glance-helper-app
name: app
- name: data
persistentVolumeClaim:
claimName: glance-helper-data
---
apiVersion: v1
kind: Service
metadata:
name: glance-helper
namespace: glance-system
spec:
selector:
app.kubernetes.io/name: glance-helper
ports:
- name: http
port: 8000
targetPort: 8000
---
apiVersion: v1
kind: Service
metadata:
name: idokep-scraper
namespace: glance-system
spec:
selector:
app.kubernetes.io/name: glance-helper
ports:
- name: http
port: 8000
targetPort: 8000
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: glance-helper
namespace: glance-system
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
external-dns.alpha.kubernetes.io/hostname: glance-helper.dooplex.hu
nginx.ingress.kubernetes.io/ssl-redirect: "true"
spec:
ingressClassName: nginx-internal
rules:
- host: glance-helper.dooplex.hu
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: glance-helper
port:
number: 8000
tls:
- hosts:
- glance-helper.dooplex.hu
secretName: glance-helper-tls