Files
recipe-importer/app/tandoor.py
T
admin bb89d97716 Fix Tandoor pagination: use page/page_size instead of limit/offset
Tandoor's /api/recipe/ endpoint uses Django REST Framework's
PageNumberPagination (page + page_size), not LimitOffsetPagination.
The limit/offset params were silently ignored, returning all recipes
on every page.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 08:50:49 +01:00

372 lines
13 KiB
Python

"""Tandoor API client — creates recipes and uploads images."""
import io
import re
import requests
def _parse_qty(qty_str: str) -> tuple[float, str]:
"""Parse a quantity string, handling ranges like '2-3' or '6- 8'.
Returns (number, range_note) where range_note is the original range
string if the quantity is a range, or empty string if it's a plain number.
"""
if not qty_str:
return (0, "")
try:
return (float(qty_str.replace(",", ".")), "")
except (ValueError, TypeError):
pass
m = re.match(r"^(\d+(?:[.,]\d+)?)\s*-\s*(\d+(?:[.,]\d+)?)$", qty_str.strip())
if m:
first = float(m.group(1).replace(",", "."))
return (first, qty_str.strip())
return (0, "")
class TandoorClient:
"""Thin wrapper around the Tandoor REST API."""
def __init__(self, base_url: str, api_key: str, api_url: str = ""):
self.base_url = base_url.rstrip("/")
self.api_url = api_url.rstrip("/") if api_url else self.base_url
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
})
# ------------------------------------------------------------------
# Public
# ------------------------------------------------------------------
def test_connection(self) -> dict:
"""Return Tandoor version info or raise on failure."""
# Try the recipe list endpoint as a connection test
r = self.session.get(
f"{self.api_url}/api/recipe/",
params={"limit": 1},
timeout=10,
)
r.raise_for_status()
# Try to get version from openapi endpoint
version = "unknown"
try:
r2 = self.session.get(f"{self.api_url}/openapi/", timeout=10)
if r2.ok:
# Parse version from YAML: "version: 1.5.26"
m = re.search(r"version:\s*['\"]?([0-9][0-9a-z._-]*)", r2.text)
if m:
version = m.group(1)
except Exception:
pass
return {"version": version}
def list_keywords(self) -> list[dict]:
"""Return all keywords as [{name, id}]."""
results = []
page_url = f"{self.api_url}/api/keyword/"
params = {"limit": 100, "format": "json"}
while page_url:
r = self.session.get(page_url, params=params, timeout=10)
if not r.ok:
break
data = r.json()
results.extend({"name": k["name"], "id": k["id"]}
for k in data.get("results", []))
page_url = data.get("next")
params = {} # next URL already has params
return results
def list_recipes(self, search: str = "", keyword_ids: list[int] | None = None,
page: int = 1, per_page: int = 50) -> dict:
"""List recipes with optional search and keyword filtering.
Returns {"items": [...], "page": int, "per_page": int, "total": int}.
"""
params: dict = {
"page": page,
"page_size": per_page,
"format": "json",
}
if search:
params["query"] = search
if keyword_ids:
params["keywords"] = keyword_ids
r = self.session.get(f"{self.api_url}/api/recipe/", params=params,
timeout=15)
r.raise_for_status()
data = r.json()
items = []
for item in data.get("results", []):
items.append({
"id": item["id"],
"name": item.get("name", ""),
"image": item.get("image") or "",
"tags": [k.get("name", "") for k in item.get("keywords", []) if k.get("name")],
"url": f"{self.base_url}/view/recipe/{item['id']}",
})
return {
"items": items,
"page": page,
"per_page": per_page,
"total": data.get("count", 0),
}
def get_recipe(self, recipe_id: int) -> dict:
"""Fetch a single recipe and return it in the common format."""
r = self.session.get(f"{self.api_url}/api/recipe/{recipe_id}/",
timeout=15)
r.raise_for_status()
data = r.json()
# Parse ingredients from all steps
ingredients = []
for step in data.get("steps", []):
for ing in step.get("ingredients", []):
if ing.get("is_header"):
ingredients.append({"group": ing.get("note", "")})
continue
food_name = ""
if ing.get("food"):
food_name = ing["food"].get("name", "")
unit_name = ""
if ing.get("unit"):
unit_name = ing["unit"].get("name", "")
amount = ing.get("amount", 0)
qty_str = ""
if amount:
qty_str = str(int(amount)) if amount == int(amount) else str(amount)
note = ing.get("note", "")
if food_name or qty_str or unit_name:
ingredients.append({
"quantity": qty_str,
"unit": unit_name,
"food": food_name,
"extra": note,
})
# Parse instructions
instructions = []
for step in data.get("steps", []):
text = step.get("instruction", "").strip()
if text:
instructions.append(text)
# Parse tags
tags = [k.get("name", "") for k in data.get("keywords", []) if k.get("name")]
return {
"title": data.get("name", ""),
"description": data.get("description", ""),
"image_url": data.get("image") or "",
"ingredients": ingredients,
"instructions": instructions,
"tags": tags,
"original_url": data.get("source_url", ""),
}
def update_recipe(self, recipe_id: int, recipe: dict) -> None:
"""Update an existing recipe from a common-format dict."""
payload = self._build_payload(recipe)
r = self.session.put(
f"{self.api_url}/api/recipe/{recipe_id}/",
json=payload,
timeout=15,
)
r.raise_for_status()
def delete_recipe(self, recipe_id: int) -> None:
"""Delete a recipe by ID."""
r = self.session.delete(f"{self.api_url}/api/recipe/{recipe_id}/",
timeout=10)
r.raise_for_status()
def find_duplicate(self, url: str, title: str = "") -> dict | None:
"""Check if a recipe with this source URL already exists."""
if not url and not title:
return None
search = title or url.split("/")[-1].replace("-", " ")
r = self.session.get(
f"{self.api_url}/api/recipe/",
params={"query": search, "limit": 20},
timeout=10,
)
if not r.ok:
return None
for item in r.json().get("results", []):
source = item.get("source_url") or ""
desc = item.get("description") or ""
if source == url or url in desc:
return {
"id": item["id"],
"name": item.get("name", ""),
"url": f"{self.base_url}/view/recipe/{item['id']}",
}
return None
def create_recipe(self, recipe: dict) -> dict:
"""Create a recipe in Tandoor from a scraper result dict.
Returns {"id": int, "url": str}.
"""
payload = self._build_payload(recipe)
r = self.session.post(
f"{self.api_url}/api/recipe/",
json=payload,
timeout=15,
)
r.raise_for_status()
data = r.json()
recipe_id = data["id"]
# Upload image if available
image_url = recipe.get("image_url")
if image_url:
try:
self._upload_image(recipe_id, image_url)
except Exception:
pass # non-fatal
return {
"id": recipe_id,
"url": f"{self.base_url}/view/recipe/{recipe_id}",
}
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _build_payload(self, recipe: dict) -> dict:
description = recipe.get("description", "")
original_url = recipe.get("original_url", "")
if original_url and original_url not in description:
description = f"{description}\n\n{original_url}".strip()
# Build ingredients list
ingredients = []
order = 0
pending_group = ""
for item in recipe.get("ingredients", []):
if isinstance(item, dict):
if "group" in item and "food" not in item:
# Group header
ingredients.append({
"food": None,
"unit": None,
"amount": 0,
"note": item["group"],
"is_header": True,
"no_amount": True,
"order": order,
})
order += 1
else:
ingredients.append(self._build_ingredient(item, order))
order += 1
# Build steps — all ingredients on step 0
instructions = recipe.get("instructions", [])
steps = []
for i, text in enumerate(instructions):
steps.append({
"instruction": text,
"ingredients": ingredients if i == 0 else [],
"order": i,
})
if not instructions and ingredients:
steps.append({
"instruction": "",
"ingredients": ingredients,
"order": 0,
})
# Keywords (tags)
tag_names = recipe.get("tags", [])
keywords = [{"name": t} for t in tag_names] if tag_names else []
return {
"name": recipe["title"],
"description": description,
"source_url": original_url,
"keywords": keywords,
"steps": steps,
"servings": 1,
}
def _build_ingredient(self, item: dict, order: int) -> dict:
"""Build a Tandoor ingredient from a structured dict."""
qty_str = str(item.get("quantity", "")).strip()
unit_str = item.get("unit", "").strip()
food_str = item.get("food", "").strip()
extra = item.get("extra", "").strip()
has_structured = bool(qty_str or unit_str)
if has_structured and food_str:
qty, range_note = _parse_qty(qty_str)
if range_note:
extra = f"{range_note} {unit_str}; {extra}".strip("; ") if extra else f"{range_note} {unit_str}".strip()
return {
"food": {"name": food_str},
"unit": {"name": unit_str} if unit_str else None,
"amount": qty,
"note": extra,
"is_header": False,
"no_amount": False,
"order": order,
}
else:
# No quantity/unit — food-only or note
note = food_str
if extra:
note = f"{note} ({extra})" if note else extra
if food_str:
return {
"food": {"name": food_str},
"unit": None,
"amount": 0,
"note": extra,
"is_header": False,
"no_amount": True,
"order": order,
}
else:
return {
"food": None,
"unit": None,
"amount": 0,
"note": note,
"is_header": False,
"no_amount": True,
"order": order,
}
def _upload_image(self, recipe_id: int, image_url: str):
"""Download image from *image_url* and upload it to the recipe."""
img_resp = requests.get(image_url, timeout=30, headers={
"User-Agent": "RecipeImporter/1.0",
})
img_resp.raise_for_status()
content_type = img_resp.headers.get("Content-Type", "image/jpeg")
ext = "jpg"
if "png" in content_type:
ext = "png"
elif "webp" in content_type:
ext = "webp"
files = {
"image": (f"recipe.{ext}", io.BytesIO(img_resp.content), content_type),
}
r = self.session.put(
f"{self.api_url}/api/recipe/{recipe_id}/image/",
files=files,
timeout=30,
)
r.raise_for_status()