Files
homelab-manifests/glance-system/glance-helper.yaml
T
2026-01-23 09:14:56 +01:00

1055 lines
38 KiB
YAML

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: glance-helper-data
namespace: glance-system
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 200Mi
---
apiVersion: v1
kind: ConfigMap
metadata:
name: glance-helper-app
namespace: glance-system
data:
app.py: |-
import os
import time
import re
from typing import List, Dict, Any, Optional
import requests
from bs4 import BeautifulSoup
from fastapi import FastAPI, Response
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
APP = FastAPI()
# ================================
# Simple Notes Widget - Multi-user
# ================================
def get_notes_file(user: str) -> str:
"""Get notes file path for a user, with validation."""
# Sanitize username: only allow alphanumeric, dash, underscore
safe_user = re.sub(r'[^a-zA-Z0-9_-]', '', user)
if not safe_user:
safe_user = "default"
data_dir = os.environ.get("DATA_DIR", "/data")
return os.path.join(data_dir, f"notes_{safe_user}.txt")
def load_notes(user: str) -> str:
"""Load notes from file for a specific user."""
notes_file = get_notes_file(user)
try:
if os.path.exists(notes_file):
with open(notes_file, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
print(f"Error loading notes for {user}: {e}")
return ""
def save_notes(user: str, content: str) -> bool:
"""Save notes to file for a specific user."""
notes_file = get_notes_file(user)
try:
with open(notes_file, "w", encoding="utf-8") as f:
f.write(content)
return True
except Exception as e:
print(f"Error saving notes for {user}: {e}")
return False
@APP.get("/notes")
def notes_widget(key: str = "", user: str = "default"):
"""Serve the notes widget HTML page for a specific user."""
expected_key = os.environ.get("GLANCE_HELPER_KEY", "")
if key != expected_key:
return Response(content="Unauthorized", status_code=401)
# Sanitize user for display
safe_user = re.sub(r'[^a-zA-Z0-9_-]', '', user) or "default"
current_notes = load_notes(safe_user)
# Escape for safe HTML embedding
escaped_notes = current_notes.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;").replace('"', "&quot;")
html = f"""<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{
background: transparent;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
height: 100vh;
display: flex;
flex-direction: column;
}}
.container {{
display: flex;
flex-direction: column;
height: 100%;
padding: 8px;
}}
.status {{
font-size: 11px;
color: rgba(255, 255, 255, 0.5);
padding: 4px 0;
text-align: right;
min-height: 20px;
}}
.status.saving {{ color: rgba(255, 200, 100, 0.8); }}
.status.saved {{ color: rgba(100, 255, 150, 0.8); }}
.status.error {{ color: rgba(255, 100, 100, 0.8); }}
textarea {{
flex: 1;
width: 100%;
background: rgba(255, 255, 255, 0.05);
border: 1px solid rgba(255, 255, 255, 0.1);
border-radius: 8px;
color: rgba(255, 255, 255, 0.9);
font-size: 14px;
line-height: 1.5;
padding: 12px;
resize: none;
outline: none;
}}
textarea:focus {{
border-color: rgba(180, 130, 220, 0.5);
background: rgba(255, 255, 255, 0.08);
}}
textarea::placeholder {{
color: rgba(255, 255, 255, 0.3);
}}
</style>
</head>
<body>
<div class="container">
<textarea id="notes" placeholder="Write your notes here...">{escaped_notes}</textarea>
<div class="status" id="status"></div>
</div>
<script>
const textarea = document.getElementById('notes');
const status = document.getElementById('status');
const apiKey = '{expected_key}';
const user = '{safe_user}';
let saveTimeout = null;
let lastSaved = textarea.value;
function updateStatus(text, className) {{
status.textContent = text;
status.className = 'status ' + (className || '');
}}
async function saveNotes() {{
const content = textarea.value;
if (content === lastSaved) return;
updateStatus('Saving...', 'saving');
try {{
const response = await fetch('/notes/save?key=' + apiKey + '&user=' + user, {{
method: 'POST',
headers: {{ 'Content-Type': 'application/json' }},
body: JSON.stringify({{ content: content }})
}});
if (response.ok) {{
lastSaved = content;
updateStatus('Saved', 'saved');
setTimeout(() => updateStatus(''), 2000);
}} else {{
updateStatus('Save failed', 'error');
}}
}} catch (e) {{
updateStatus('Save failed', 'error');
}}
}}
textarea.addEventListener('input', () => {{
if (saveTimeout) clearTimeout(saveTimeout);
saveTimeout = setTimeout(saveNotes, 1000);
}});
textarea.addEventListener('blur', saveNotes);
window.addEventListener('beforeunload', saveNotes);
</script>
</body>
</html>"""
return Response(content=html, media_type="text/html")
@APP.post("/notes/save")
async def save_notes_api(key: str = "", user: str = "default", content: dict = None):
"""API endpoint to save notes for a specific user."""
expected_key = os.environ.get("GLANCE_HELPER_KEY", "")
if key != expected_key:
return Response(content="Unauthorized", status_code=401)
safe_user = re.sub(r'[^a-zA-Z0-9_-]', '', user) or "default"
if content and "content" in content:
if save_notes(safe_user, content["content"]):
return {"status": "ok", "user": safe_user}
return Response(content="Failed to save", status_code=500)
# ================================
# Időkép configuration
# ================================
IDOKEP_URL = os.getenv("IDOKEP_URL", "https://www.idokep.hu/idojaras/Budapest%20VII.%20ker")
PLACE_NAME = os.getenv("PLACE_NAME", "Budapest VII. ker")
SOURCE_NAME = "Időkép"
UA = os.getenv("USER_AGENT", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome Safari")
SCRAPES = Counter("idokep_scrapes_total", "Total Időkép scrapes", ["place", "status"])
SCRAPE_SECONDS = Histogram("idokep_scrape_seconds", "Időkép scrape duration in seconds", ["place"])
ICON_CONDITIONS = {
# Daytime (0xx)
"010": "Derült",
"011": "Pára",
"021": "Gyengén felhős",
"022": "Közepesen felhős",
"023": "Erősen felhős",
"030": "Borult",
"040": "Ködszitálás",
"041": "Szitálás",
"042": "Gyenge eső",
"043": "Eső",
"043s": "Eső viharos széllel",
"051": "Havas eső",
"052": "Ónos eső",
"061": "Havazás",
"062": "Havazás",
"081": "Zápor",
"083": "Hózápor",
"088": "Jégeső",
"090": "Zivatar",
"092": "Száraz zivatar",
"100": "Köd",
"101": "Porvihar",
# Nighttime (3xx) - same conditions
"310": "Derült",
"311": "Pára",
"321": "Gyengén felhős",
"322": "Közepesen felhős",
"323": "Erősen felhős",
"330": "Borult",
"340": "Ködszitálás",
"341": "Szitálás",
"342": "Gyenge eső",
"343": "Eső",
"343s": "Eső viharos széllel",
"351": "Havas eső",
"352": "Ónos eső",
"361": "Havazás",
"362": "Havazás",
"381": "Zápor",
"383": "Hózápor",
"388": "Jégeső",
"390": "Zivatar",
"392": "Száraz zivatar",
"400": "Köd",
"401": "Porvihar",
}
def _icon_condition(icon_url: str) -> str:
"""Extract weather condition from icon URL."""
if not icon_url:
return ""
# URL format: https://www.idokep.hu/assets/forecast-icons/XXX.svg (or .png)
import re
match = re.search(r'/(\d{3}s?)\.(svg|png)', icon_url)
if match:
code = match.group(1)
return ICON_CONDITIONS.get(code, "")
return ""
def _abs_url(url): return "https://www.idokep.hu" + url if url and not url.startswith("http") else url
def _to_float(s):
try: return float(s.strip().replace("˚C", "").replace("°C", "").replace("°", "").replace(",", "."))
except: return None
def _calculate_gradient_data(daily_items):
if not daily_items: return daily_items
# 1. Global Min/Max for the week
mins = [d["tmin_c"] for d in daily_items if d["tmin_c"] is not None]
maxs = [d["tmax_c"] for d in daily_items if d["tmax_c"] is not None]
if not mins or not maxs: return daily_items
g_min = min(mins) - 1.0
g_max = max(maxs) + 1.0
span = g_max - g_min
if span <= 0: span = 1.0
# 2. Helper to get percentage
def get_pct(t): return (t - g_min) / span * 100.0
# 3. Calculate stops (floats)
s_wht = get_pct(-10)
s_blu = get_pct(0)
s_pur = get_pct(15)
s_pnk = get_pct(25)
s_red = get_pct(35)
# 4. Inject individual floats into the dictionary
for d in daily_items:
tmin, tmax = d["tmin_c"], d["tmax_c"]
if tmin is None or tmax is None: continue
w_pct = (tmax - tmin) / span * 100.0
if w_pct < 2: w_pct = 2
l_pct = (tmin - g_min) / span * 100.0
inner_w = (100.0 / w_pct) * 100.0
inner_ml = -(l_pct / w_pct) * 100.0
# STORE AS FLOATS (No "%" symbol here)
d["c_l"] = l_pct
d["c_w"] = w_pct
d["c_gw"] = inner_w
d["c_ml"] = inner_ml
d["c_s1"] = s_wht # White
d["c_s2"] = s_blu # Blue
d["c_s3"] = s_pur # Purple
d["c_s4"] = s_pnk # Pink
d["c_s5"] = s_red # Red
return daily_items
def scrape() -> Dict[str, Any]:
try:
r = requests.get(IDOKEP_URL, headers={"User-Agent": UA}, timeout=15)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
# Current
cur_temp_el = soup.select_one(".current-temperature")
cur_cond_el = soup.select_one(".current-weather")
cur_icon_el = soup.select_one(".forecast-bigicon")
# Hourly
hourly = []
for card in soup.select(".ik.hourly-forecast-card")[:8]:
t = card.select_one(".ik.hourly-forecast-hour")
temp = card.select_one(".ik.temperature-circled")
icon = card.select_one("img.ik.forecast-icon")
if t and temp:
hourly.append({
"time": t.get_text(strip=True),
"temp_c": _to_float(temp.get_text(strip=True)),
"icon_url": _abs_url(icon.get("src")),
"condition": _icon_condition(_abs_url(icon.get("src")))
})
# Daily
daily = []
for col in soup.select(".ik.daily-forecast-container .ik.dailyForecastCol")[:15]:
dow = col.select_one(".ik.dfDay")
daynum = col.select_one(".ik.dfDayNum")
icon = col.select_one("img.ik.forecast-icon")
tmax = col.select_one("div.ik.max")
tmin = col.select_one("div.ik.min")
# Fallback for holiday layout
v_tmax, v_tmin = None, None
if tmax and tmin:
v_tmax = _to_float(tmax.get_text(strip=True))
v_tmin = _to_float(tmin.get_text(strip=True))
else:
vals = [a.get_text(strip=True) for a in col.select(".ik.min-max-container a")]
vals = [v for v in vals if re.fullmatch(r"-?\d+", v)]
if len(vals) >= 2:
v_tmax, v_tmin = _to_float(vals[0]), _to_float(vals[1])
if v_tmax is not None and v_tmin is not None:
icon_url = _abs_url(icon.get("src") if icon else None)
daily.append({
"daynum": daynum.get_text(strip=True) if daynum else "",
"dow": dow.get_text(strip=True) if dow else "",
"tmin_c": v_tmin,
"tmax_c": v_tmax,
"icon_url": _abs_url(icon.get("src") if icon else None),
"condition": _icon_condition(icon_url),
})
daily = _calculate_gradient_data(daily[:5])
return {
"source": {"name": SOURCE_NAME, "url": IDOKEP_URL},
"location": {"name": PLACE_NAME},
"current": {
"temp_c": _to_float(cur_temp_el.get_text(strip=True)) if cur_temp_el else 0,
"condition": cur_cond_el.get_text(strip=True) if cur_cond_el else "",
"icon_url": _abs_url(cur_icon_el.get("src")) if cur_icon_el else ""
},
"hourly": hourly,
"daily": daily,
"fetched_at_unix": int(time.time()),
}
except Exception as e:
print(f"Scrape error: {e}")
raise
@APP.get("/api")
def api():
status = "ok"
try:
data = scrape()
except:
status = "error"
SCRAPES.labels(place=PLACE_NAME, status=status).inc()
raise
SCRAPES.labels(place=PLACE_NAME, status=status).inc()
import json
return Response(content=json.dumps(data, ensure_ascii=False), media_type="application/json; charset=utf-8")
@APP.get("/metrics")
def metrics():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
# ================================
# Tandoor "Meal of the Day" - Enhanced Version
# ================================
from datetime import datetime, timezone, timedelta
from zoneinfo import ZoneInfo
from urllib.parse import urlencode
from fastapi import HTTPException, Query
from fastapi.responses import RedirectResponse
import json
import random
from pathlib import Path
TANDOOR_INTERNAL_URL = os.getenv("TANDOOR_INTERNAL_URL", "").rstrip("/")
TANDOOR_PUBLIC_URL = os.getenv("TANDOOR_PUBLIC_URL", "").rstrip("/")
GLANCE_HELPER_PUBLIC_URL = os.getenv("GLANCE_HELPER_PUBLIC_URL", "").rstrip("/")
GLANCE_HELPER_KEY = os.getenv("GLANCE_HELPER_KEY", "")
DATA_DIR = Path(os.getenv("DATA_DIR", "/data"))
DATA_DIR.mkdir(parents=True, exist_ok=True)
COOKED_PATH = DATA_DIR / "tandoor-cooked.json"
PICKS_PATH = DATA_DIR / "tandoor-picks.json"
# Cooldown: don't suggest recipes cooked within this many days
TANDOOR_COOLDOWN_DAYS = int(os.getenv("TANDOOR_COOLDOWN_DAYS", "14"))
def _today_str() -> str:
"""YYYY-MM-DD in Europe/Budapest if tzdata exists, else UTC."""
try:
tz = ZoneInfo("Europe/Budapest")
return datetime.now(tz).strftime("%Y-%m-%d")
except Exception:
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
def _load_json(path: Path, default):
try:
if path.exists():
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
pass
return default
def _save_json(path: Path, obj) -> None:
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(path)
def _tandoor_headers():
token = os.getenv("TANDOOR_TOKEN", "")
if not token:
raise HTTPException(status_code=500, detail="TANDOOR_TOKEN is not set")
return {"Authorization": f"Bearer {token}", "Accept": "application/json"}
def _rewrite_to_public(url: str) -> str:
"""Turn internal URLs into public ones (images/links)."""
if not url:
return url
if TANDOOR_PUBLIC_URL and TANDOOR_INTERNAL_URL and url.startswith(TANDOOR_INTERNAL_URL):
return TANDOOR_PUBLIC_URL + url[len(TANDOOR_INTERNAL_URL):]
return url
def _fetch_all_recipes() -> list[dict]:
if not TANDOOR_INTERNAL_URL:
raise HTTPException(status_code=500, detail="TANDOOR_INTERNAL_URL is not set")
url = f"{TANDOOR_INTERNAL_URL}/api/recipe/?page_size=200"
out = []
for _ in range(100): # safety
r = requests.get(url, headers=_tandoor_headers(), timeout=15)
if r.status_code != 200:
raise HTTPException(status_code=502, detail=f"Tandoor returned {r.status_code}: {r.text[:200]}")
j = r.json()
results = j.get("results", [])
out.extend(results)
url = j.get("next")
if not url:
break
return out
# ================================
# Cooked History Management
# ================================
# Structure: {"<recipe_id>": ["2026-01-22", "2026-01-15", ...], ...}
def _load_cooked_history() -> dict[str, list[str]]:
"""Load cooked history. Returns {recipe_id_str: [date_str, ...]}"""
return _load_json(COOKED_PATH, {})
def _save_cooked_history(history: dict[str, list[str]]) -> None:
_save_json(COOKED_PATH, history)
def _get_cooked_count(history: dict, recipe_id: int) -> int:
"""Get total times a recipe has been cooked."""
return len(history.get(str(recipe_id), []))
def _get_recently_cooked_ids(history: dict, days: int) -> set[int]:
"""Get recipe IDs cooked within the last N days."""
if days <= 0:
return set()
try:
tz = ZoneInfo("Europe/Budapest")
except Exception:
tz = timezone.utc
cutoff = (datetime.now(tz) - timedelta(days=days)).strftime("%Y-%m-%d")
recent = set()
for rid_str, dates in history.items():
for d in dates:
if d >= cutoff:
recent.add(int(rid_str))
break
return recent
def _record_cooked(recipe_id: int) -> dict[str, list[str]]:
"""Record that a recipe was cooked today. Returns updated history."""
history = _load_cooked_history()
rid_str = str(recipe_id)
today = _today_str()
if rid_str not in history:
history[rid_str] = []
# Avoid duplicate entries for same day
if today not in history[rid_str]:
history[rid_str].append(today)
# Keep sorted (newest first) for easier reading
history[rid_str] = sorted(history[rid_str], reverse=True)
_save_cooked_history(history)
return history
# ================================
# Daily Picks Logic
# ================================
def _compute_daily_picks(count: int, cooldown_days: int) -> dict:
today = _today_str()
history = _load_cooked_history()
recently_cooked = _get_recently_cooked_ids(history, cooldown_days)
picks_doc = _load_json(PICKS_PATH, {})
# Check if we need to regenerate picks
needs_regen = False
if picks_doc.get("date") != today:
needs_regen = True
elif picks_doc.get("count") != count:
needs_regen = True
elif picks_doc.get("cooldown") != cooldown_days:
needs_regen = True
else:
# Check if we can add more picks (new recipes added mid-day)
current_ids = set(picks_doc.get("ids", []))
if len(current_ids) < count:
recipes = _fetch_all_recipes()
available = [r.get("id") for r in recipes
if r.get("id") is not None
and r.get("id") not in current_ids
and r.get("id") not in recently_cooked]
if available:
needs_regen = True
if not needs_regen:
return picks_doc
# Generate new picks
recipes = _fetch_all_recipes()
available = [r for r in recipes
if r.get("id") is not None
and r.get("id") not in recently_cooked]
if not available:
# Fallback: if everything is in cooldown, ignore cooldown
available = recipes[:]
chosen = random.sample(available, k=min(count, len(available))) if available else []
picks_doc = {
"date": today,
"count": count,
"cooldown": cooldown_days,
"ids": [r.get("id") for r in chosen if r.get("id") is not None]
}
_save_json(PICKS_PATH, picks_doc)
return picks_doc
def _build_items_from_ids(ids: list[int], history: dict) -> tuple[list[dict], int]:
recipes = _fetch_all_recipes()
by_id = {r.get("id"): r for r in recipes if r.get("id") is not None}
items = []
for rid in ids:
r = by_id.get(rid)
if not r:
continue
img = _rewrite_to_public(r.get("image") or "")
url = f"{TANDOOR_PUBLIC_URL}/recipe/{rid}" if TANDOOR_PUBLIC_URL else ""
cook_params = {"id": rid}
if GLANCE_HELPER_KEY:
cook_params["key"] = GLANCE_HELPER_KEY
cook_url = f"{GLANCE_HELPER_PUBLIC_URL}/tandoor/cook?{urlencode(cook_params)}" if GLANCE_HELPER_PUBLIC_URL else ""
items.append({
"id": rid,
"name": r.get("name") or "",
"image": img,
"url": url,
"cook_url": cook_url,
"cooked_count": _get_cooked_count(history, rid)
})
return items, len(recipes)
@APP.get("/tandoor/daily")
def tandoor_daily(count: int = Query(3, ge=1, le=10), cooldown: int = Query(None, ge=0, le=365)):
"""
Get daily meal suggestions.
- count: Number of suggestions (1-10, default 3)
- cooldown: Days to exclude recently cooked meals (0-365, default from env TANDOOR_COOLDOWN_DAYS or 14)
"""
cooldown_days = cooldown if cooldown is not None else TANDOOR_COOLDOWN_DAYS
picks_doc = _compute_daily_picks(count, cooldown_days)
ids = picks_doc.get("ids", [])
history = _load_cooked_history()
items, total = _build_items_from_ids(ids, history)
return {
"date": picks_doc.get("date"),
"total_recipes": total,
"cooldown_days": cooldown_days,
"items": items
}
@APP.get("/tandoor/cook")
def tandoor_cook(id: int, key: str | None = None, redirect: str | None = None):
"""Mark a recipe as cooked today."""
if GLANCE_HELPER_KEY and key != GLANCE_HELPER_KEY:
raise HTTPException(status_code=403, detail="Invalid key")
today = _today_str()
# Record in permanent history
history = _record_cooked(id)
recently_cooked = _get_recently_cooked_ids(history, TANDOOR_COOLDOWN_DAYS)
# Remove from today's picks and try to refill
picks = _load_json(PICKS_PATH, {})
if picks.get("date") == today and isinstance(picks.get("ids"), list):
ids = [x for x in picks["ids"] if x != id]
target = int(picks.get("count") or len(ids))
if len(ids) < target:
recipes = _fetch_all_recipes()
avoid = set(ids) | recently_cooked
candidates = [r.get("id") for r in recipes
if r.get("id") is not None
and r.get("id") not in avoid]
if candidates:
ids.append(random.choice(candidates))
picks["ids"] = ids[:target]
_save_json(PICKS_PATH, picks)
if redirect:
return RedirectResponse(url=redirect, status_code=302)
return {
"ok": True,
"date": today,
"cooked_id": id,
"cooked_total": _get_cooked_count(history, id)
}
@APP.get("/tandoor/history")
def tandoor_history():
"""Get full cooked history (for debugging/stats)."""
history = _load_cooked_history()
return {
"recipes": {
int(k): {"dates": v, "count": len(v)}
for k, v in history.items()
},
"total_cooks": sum(len(v) for v in history.values())
}
# ================================
# Version Checker - Container Version Monitoring
# ================================
# Scrapes version-checker Prometheus metrics, parses versions,
# and returns properly sorted results with the NEWEST available version.
VERSION_CHECKER_URL = os.getenv("VERSION_CHECKER_URL", "http://version-checker.version-checker-system.svc.cluster.local:8080/metrics")
# Regex patterns for filtering (same as in Glance widget)
VERSION_CHECKER_EXCLUDE_IMAGES = os.getenv("VERSION_CHECKER_EXCLUDE_IMAGES",
r"(^|.*/)(busybox|redis|alpine|nginx|mariadb|mysql|valkey)$|^longhornio.*$|(^|.*/)postgres.*$|^registry\.k8s\.io/ingress-nginx/.*$"
)
def _parse_semver(version: str) -> tuple:
"""
Parse version string into comparable tuple.
Handles: v1.2.3, 1.2.3, v1.2.3-beta, 1.2.3-20251030, etc.
Returns tuple that sorts correctly: (major, minor, patch, prerelease_flag, prerelease)
"""
if not version:
return (0, 0, 0, 0, "")
# Skip sha256 digests entirely
if "sha256:" in version or version.startswith("sha"):
return (0, 0, 0, 0, version)
# Remove 'v' prefix
v = version.lstrip("v")
# Remove @sha256:... suffix if present (e.g., "1.2.3@sha256:abc...")
if "@" in v:
v = v.split("@")[0]
# Split into version and prerelease/build metadata
# Handle: 1.2.3-beta, 1.2.3-20251030, 1.2.3+build
prerelease = ""
if "-" in v:
parts = v.split("-", 1)
v = parts[0]
prerelease = parts[1]
elif "+" in v:
parts = v.split("+", 1)
v = parts[0]
prerelease = parts[1]
# Parse major.minor.patch
segments = v.split(".")
try:
major = int(segments[0]) if len(segments) > 0 and segments[0].isdigit() else 0
minor = int(segments[1]) if len(segments) > 1 and segments[1].isdigit() else 0
patch = int(segments[2]) if len(segments) > 2 and segments[2].isdigit() else 0
except (ValueError, IndexError):
major, minor, patch = 0, 0, 0
# For sorting: no prerelease > prerelease (1.2.3 > 1.2.3-beta)
# prerelease_flag: 1 = no prerelease (higher), 0 = has prerelease (lower)
prerelease_flag = 1 if not prerelease else 0
return (major, minor, patch, prerelease_flag, prerelease)
def _compare_versions(v1: str, v2: str) -> int:
"""Compare two version strings. Returns: -1 if v1<v2, 0 if equal, 1 if v1>v2"""
t1 = _parse_semver(v1)
t2 = _parse_semver(v2)
if t1 < t2:
return -1
elif t1 > t2:
return 1
return 0
def _parse_prometheus_metrics(text: str) -> list[dict]:
"""
Parse Prometheus metrics text format.
Extracts version_checker_is_latest_version metrics.
"""
results = []
pattern = re.compile(
r'version_checker_is_latest_version\{([^}]+)\}\s+(\d+(?:\.\d+)?)'
)
for match in pattern.finditer(text):
labels_str = match.group(1)
value = float(match.group(2))
# Parse labels
labels = {}
# Handle labels like: container="foo",image="bar",current_version="1.0"
label_pattern = re.compile(r'(\w+)="([^"]*)"')
for label_match in label_pattern.finditer(labels_str):
labels[label_match.group(1)] = label_match.group(2)
results.append({
"labels": labels,
"value": value # 1 = up to date, 0 = outdated
})
return results
def _fetch_version_checker_data() -> dict:
"""
Fetch and process version-checker metrics.
Returns structured data with deduplicated images and newest versions.
"""
try:
r = requests.get(VERSION_CHECKER_URL, timeout=30)
r.raise_for_status()
metrics = _parse_prometheus_metrics(r.text)
except Exception as e:
print(f"Version checker fetch error: {e}")
raise HTTPException(status_code=502, detail=f"Failed to fetch version-checker: {e}")
# Build exclude pattern
exclude_pattern = re.compile(VERSION_CHECKER_EXCLUDE_IMAGES) if VERSION_CHECKER_EXCLUDE_IMAGES else None
# Group metrics by image
# Structure: {image: {"current": str, "latest_versions": [str], "is_latest": bool}}
images: Dict[str, Dict] = {}
for metric in metrics:
labels = metric["labels"]
container_type = labels.get("container_type", "")
image = labels.get("image", "")
current = labels.get("current_version", "")
latest = labels.get("latest_version", "")
is_latest = metric["value"] == 1
# Filter: only containers (not init containers for simplicity)
if container_type != "container":
continue
# Filter: exclude patterns
if exclude_pattern and exclude_pattern.search(image):
continue
# Filter: skip sha256 versions (can't compare meaningfully)
if "sha256:" in current or "sha256:" in latest:
continue
# Initialize or update image entry
if image not in images:
images[image] = {
"current_version": current,
"latest_versions": [],
"is_latest": True
}
# Track all latest versions reported (for outdated images)
if not is_latest:
images[image]["is_latest"] = False
if latest and latest not in images[image]["latest_versions"]:
images[image]["latest_versions"].append(latest)
# Process: find the NEWEST version for each image
up_to_date = []
outdated = []
for image, data in images.items():
current = data["current_version"]
if data["is_latest"]:
up_to_date.append({
"image": image,
"current_version": current,
"latest_version": current
})
else:
# Sort versions and pick the newest
versions = data["latest_versions"]
if versions:
versions_sorted = sorted(versions, key=lambda v: _parse_semver(v), reverse=True)
newest = versions_sorted[0]
else:
newest = current
outdated.append({
"image": image,
"current_version": current,
"latest_version": newest,
"all_newer_versions": len(versions)
})
# Sort outdated by image name for consistent ordering
outdated.sort(key=lambda x: x["image"])
up_to_date.sort(key=lambda x: x["image"])
return {
"fetched_at_unix": int(time.time()),
"summary": {
"up_to_date": len(up_to_date),
"outdated": len(outdated),
"total": len(up_to_date) + len(outdated)
},
"outdated": outdated,
"up_to_date": up_to_date
}
@APP.get("/versions")
def versions_api():
"""
Returns container version status with properly sorted newest versions.
Response format optimized for Glance custom-api widget.
"""
data = _fetch_version_checker_data()
return Response(
content=json.dumps(data, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
@APP.get("/versions/outdated")
def versions_outdated():
"""Returns only outdated images (for simpler widget)."""
data = _fetch_version_checker_data()
return Response(
content=json.dumps({
"fetched_at_unix": data["fetched_at_unix"],
"summary": data["summary"],
"images": data["outdated"]
}, ensure_ascii=False),
media_type="application/json; charset=utf-8"
)
---
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
argocd.argoproj.io/tracking-id: glance:apps/Deployment:glance-system/glance-helper
reloader.stakater.com/auto: "true"
name: glance-helper
namespace: glance-system
spec:
progressDeadlineSeconds: 600
replicas: 1
revisionHistoryLimit: 10
selector:
matchLabels:
app: glance-helper
strategy:
type: Recreate
template:
metadata:
labels:
app: glance-helper
app.kubernetes.io/name: glance-helper
spec:
containers:
- args:
- |-
set -eux;
export DEBIAN_FRONTEND=noninteractive;
apt-get update;
apt-get install -y --no-install-recommends curl ca-certificates iputils-ping dnsutils tzdata net-tools;
rm -rf /var/lib/apt/lists/*;
pip install --no-cache-dir fastapi uvicorn requests beautifulsoup4 prometheus-client;
python -c "import uvicorn; uvicorn.run('app:APP', host='0.0.0.0', port=8000)"
command:
- /bin/sh
- -lc
env:
- name: IDOKEP_URL
value: https://www.idokep.hu/idojaras/Budapest%20VII.%20ker
- name: PLACE_NAME
value: Budapest VII. ker
- name: TANDOOR_INTERNAL_URL
value: http://tandoor.tandoor-system.svc.cluster.local:8080
- name: TANDOOR_PUBLIC_URL
value: https://tandoor.dooplex.hu
- name: TANDOOR_TOKEN
value: tda_8a8b169c_5d1f_4962_83a2_0f2719c7d61a
- name: GLANCE_HELPER_PUBLIC_URL
value: https://glance-helper.dooplex.hu
- name: DATA_DIR
value: /data
- name: GLANCE_HELPER_KEY
value: oplQqnLnJK2vErRVYJpvVUcSDBOSbCHZSbsYY2bwSifgTMfT
- name: TZ
value: Europe/Budapest
# Version Checker configuration
- name: VERSION_CHECKER_URL
value: http://version-checker.version-checker-system.svc.cluster.local:8080/metrics
- name: VERSION_CHECKER_EXCLUDE_IMAGES
value: "(^|.*/)(busybox|redis|alpine|nginx|mariadb|mysql|valkey)$|^longhornio.*$|(^|.*/)postgres.*$|^registry\\.k8s\\.io/ingress-nginx/.*$"
image: python:3.12-bookworm
imagePullPolicy: IfNotPresent
name: glance-helper
ports:
- containerPort: 8000
protocol: TCP
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /app
name: app
- mountPath: /data
name: data
workingDir: /app
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
terminationGracePeriodSeconds: 30
volumes:
- configMap:
defaultMode: 420
name: glance-helper-app
name: app
- name: data
persistentVolumeClaim:
claimName: glance-helper-data
---
apiVersion: v1
kind: Service
metadata:
name: glance-helper
namespace: glance-system
spec:
selector:
app.kubernetes.io/name: glance-helper
ports:
- name: http
port: 8000
targetPort: 8000
---
apiVersion: v1
kind: Service
metadata:
name: idokep-scraper
namespace: glance-system
spec:
selector:
app.kubernetes.io/name: glance-helper
ports:
- name: http
port: 8000
targetPort: 8000
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: glance-helper
namespace: glance-system
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
external-dns.alpha.kubernetes.io/hostname: glance-helper.dooplex.hu
nginx.ingress.kubernetes.io/ssl-redirect: "true"
spec:
ingressClassName: nginx-internal
rules:
- host: glance-helper.dooplex.hu
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: glance-helper
port:
number: 8000
tls:
- hosts:
- glance-helper.dooplex.hu
secretName: glance-helper-tls