3524f372cc
Both APIs require numeric quantity/amount fields. When a range like "2-3" is detected, uses the first number as the quantity and puts the full range (e.g. "2- 3 ek") in the note field. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
268 lines
9.0 KiB
Python
268 lines
9.0 KiB
Python
"""Tandoor API client — creates recipes and uploads images."""
|
|
|
|
import io
|
|
import re
|
|
import requests
|
|
|
|
|
|
def _parse_qty(qty_str: str) -> tuple[float, str]:
|
|
"""Parse a quantity string, handling ranges like '2-3' or '6- 8'.
|
|
|
|
Returns (number, range_note) where range_note is the original range
|
|
string if the quantity is a range, or empty string if it's a plain number.
|
|
"""
|
|
if not qty_str:
|
|
return (0, "")
|
|
try:
|
|
return (float(qty_str.replace(",", ".")), "")
|
|
except (ValueError, TypeError):
|
|
pass
|
|
m = re.match(r"^(\d+(?:[.,]\d+)?)\s*-\s*(\d+(?:[.,]\d+)?)$", qty_str.strip())
|
|
if m:
|
|
first = float(m.group(1).replace(",", "."))
|
|
return (first, qty_str.strip())
|
|
return (0, "")
|
|
|
|
|
|
class TandoorClient:
|
|
"""Thin wrapper around the Tandoor REST API."""
|
|
|
|
def __init__(self, base_url: str, api_key: str, api_url: str = ""):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.api_url = api_url.rstrip("/") if api_url else self.base_url
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Accept": "application/json",
|
|
})
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public
|
|
# ------------------------------------------------------------------
|
|
|
|
def test_connection(self) -> dict:
|
|
"""Return Tandoor version info or raise on failure."""
|
|
# Try the recipe list endpoint as a connection test
|
|
r = self.session.get(
|
|
f"{self.api_url}/api/recipe/",
|
|
params={"limit": 1},
|
|
timeout=10,
|
|
)
|
|
r.raise_for_status()
|
|
|
|
# Try to get version from openapi endpoint
|
|
version = "unknown"
|
|
try:
|
|
r2 = self.session.get(f"{self.api_url}/openapi/", timeout=10)
|
|
if r2.ok:
|
|
# Parse version from YAML: "version: 1.5.26"
|
|
m = re.search(r"version:\s*['\"]?([0-9][0-9a-z._-]*)", r2.text)
|
|
if m:
|
|
version = m.group(1)
|
|
except Exception:
|
|
pass
|
|
|
|
return {"version": version}
|
|
|
|
def list_keywords(self) -> list[dict]:
|
|
"""Return all keywords as [{name, id}]."""
|
|
results = []
|
|
page_url = f"{self.api_url}/api/keyword/"
|
|
params = {"limit": 100, "format": "json"}
|
|
while page_url:
|
|
r = self.session.get(page_url, params=params, timeout=10)
|
|
if not r.ok:
|
|
break
|
|
data = r.json()
|
|
results.extend({"name": k["name"], "id": k["id"]}
|
|
for k in data.get("results", []))
|
|
page_url = data.get("next")
|
|
params = {} # next URL already has params
|
|
return results
|
|
|
|
def find_duplicate(self, url: str, title: str = "") -> dict | None:
|
|
"""Check if a recipe with this source URL already exists."""
|
|
if not url and not title:
|
|
return None
|
|
search = title or url.split("/")[-1].replace("-", " ")
|
|
r = self.session.get(
|
|
f"{self.api_url}/api/recipe/",
|
|
params={"query": search, "limit": 20},
|
|
timeout=10,
|
|
)
|
|
if not r.ok:
|
|
return None
|
|
for item in r.json().get("results", []):
|
|
source = item.get("source_url") or ""
|
|
desc = item.get("description") or ""
|
|
if source == url or url in desc:
|
|
return {
|
|
"id": item["id"],
|
|
"name": item["name"],
|
|
"url": f"{self.base_url}/view/recipe/{item['id']}",
|
|
}
|
|
return None
|
|
|
|
def create_recipe(self, recipe: dict) -> dict:
|
|
"""Create a recipe in Tandoor from a scraper result dict.
|
|
|
|
Returns {"id": int, "url": str}.
|
|
"""
|
|
payload = self._build_payload(recipe)
|
|
r = self.session.post(
|
|
f"{self.api_url}/api/recipe/",
|
|
json=payload,
|
|
timeout=15,
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
recipe_id = data["id"]
|
|
|
|
# Upload image if available
|
|
image_url = recipe.get("image_url")
|
|
if image_url:
|
|
try:
|
|
self._upload_image(recipe_id, image_url)
|
|
except Exception:
|
|
pass # non-fatal
|
|
|
|
return {
|
|
"id": recipe_id,
|
|
"url": f"{self.base_url}/view/recipe/{recipe_id}",
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal
|
|
# ------------------------------------------------------------------
|
|
|
|
def _build_payload(self, recipe: dict) -> dict:
|
|
description = recipe.get("description", "")
|
|
original_url = recipe.get("original_url", "")
|
|
if original_url and original_url not in description:
|
|
description = f"{description}\n\n{original_url}".strip()
|
|
|
|
# Build ingredients list
|
|
ingredients = []
|
|
order = 0
|
|
pending_group = ""
|
|
for item in recipe.get("ingredients", []):
|
|
if isinstance(item, dict):
|
|
if "group" in item and "food" not in item:
|
|
# Group header
|
|
ingredients.append({
|
|
"food": None,
|
|
"unit": None,
|
|
"amount": 0,
|
|
"note": item["group"],
|
|
"is_header": True,
|
|
"no_amount": True,
|
|
"order": order,
|
|
})
|
|
order += 1
|
|
else:
|
|
ingredients.append(self._build_ingredient(item, order))
|
|
order += 1
|
|
|
|
# Build steps — all ingredients on step 0
|
|
instructions = recipe.get("instructions", [])
|
|
steps = []
|
|
for i, text in enumerate(instructions):
|
|
steps.append({
|
|
"instruction": text,
|
|
"ingredients": ingredients if i == 0 else [],
|
|
"order": i,
|
|
})
|
|
|
|
if not instructions and ingredients:
|
|
steps.append({
|
|
"instruction": "",
|
|
"ingredients": ingredients,
|
|
"order": 0,
|
|
})
|
|
|
|
# Keywords (tags)
|
|
tag_names = recipe.get("tags", [])
|
|
keywords = [{"name": t} for t in tag_names] if tag_names else []
|
|
|
|
return {
|
|
"name": recipe["title"],
|
|
"description": description,
|
|
"source_url": original_url,
|
|
"keywords": keywords,
|
|
"steps": steps,
|
|
"servings": 1,
|
|
}
|
|
|
|
def _build_ingredient(self, item: dict, order: int) -> dict:
|
|
"""Build a Tandoor ingredient from a structured dict."""
|
|
qty_str = str(item.get("quantity", "")).strip()
|
|
unit_str = item.get("unit", "").strip()
|
|
food_str = item.get("food", "").strip()
|
|
extra = item.get("extra", "").strip()
|
|
|
|
has_structured = bool(qty_str or unit_str)
|
|
|
|
if has_structured and food_str:
|
|
qty, range_note = _parse_qty(qty_str)
|
|
if range_note:
|
|
extra = f"{range_note} {unit_str}; {extra}".strip("; ") if extra else f"{range_note} {unit_str}".strip()
|
|
|
|
return {
|
|
"food": {"name": food_str},
|
|
"unit": {"name": unit_str} if unit_str else None,
|
|
"amount": qty,
|
|
"note": extra,
|
|
"is_header": False,
|
|
"no_amount": False,
|
|
"order": order,
|
|
}
|
|
else:
|
|
# No quantity/unit — food-only or note
|
|
note = food_str
|
|
if extra:
|
|
note = f"{note} ({extra})" if note else extra
|
|
if food_str:
|
|
return {
|
|
"food": {"name": food_str},
|
|
"unit": None,
|
|
"amount": 0,
|
|
"note": extra,
|
|
"is_header": False,
|
|
"no_amount": True,
|
|
"order": order,
|
|
}
|
|
else:
|
|
return {
|
|
"food": None,
|
|
"unit": None,
|
|
"amount": 0,
|
|
"note": note,
|
|
"is_header": False,
|
|
"no_amount": True,
|
|
"order": order,
|
|
}
|
|
|
|
def _upload_image(self, recipe_id: int, image_url: str):
|
|
"""Download image from *image_url* and upload it to the recipe."""
|
|
img_resp = requests.get(image_url, timeout=30, headers={
|
|
"User-Agent": "RecipeImporter/1.0",
|
|
})
|
|
img_resp.raise_for_status()
|
|
|
|
content_type = img_resp.headers.get("Content-Type", "image/jpeg")
|
|
ext = "jpg"
|
|
if "png" in content_type:
|
|
ext = "png"
|
|
elif "webp" in content_type:
|
|
ext = "webp"
|
|
|
|
files = {
|
|
"image": (f"recipe.{ext}", io.BytesIO(img_resp.content), content_type),
|
|
}
|
|
r = self.session.put(
|
|
f"{self.api_url}/api/recipe/{recipe_id}/image/",
|
|
files=files,
|
|
timeout=30,
|
|
)
|
|
r.raise_for_status()
|