"""Tandoor API client — creates recipes and uploads images.""" import io import re import requests def _parse_qty(qty_str: str) -> tuple[float, str]: """Parse a quantity string, handling ranges like '2-3' or '6- 8'. Returns (number, range_note) where range_note is the original range string if the quantity is a range, or empty string if it's a plain number. """ if not qty_str: return (0, "") try: return (float(qty_str.replace(",", ".")), "") except (ValueError, TypeError): pass m = re.match(r"^(\d+(?:[.,]\d+)?)\s*-\s*(\d+(?:[.,]\d+)?)$", qty_str.strip()) if m: first = float(m.group(1).replace(",", ".")) return (first, qty_str.strip()) return (0, "") class TandoorClient: """Thin wrapper around the Tandoor REST API.""" def __init__(self, base_url: str, api_key: str, api_url: str = ""): self.base_url = base_url.rstrip("/") self.api_url = api_url.rstrip("/") if api_url else self.base_url self.session = requests.Session() self.session.headers.update({ "Authorization": f"Bearer {api_key}", "Accept": "application/json", }) # ------------------------------------------------------------------ # Public # ------------------------------------------------------------------ def test_connection(self) -> dict: """Return Tandoor version info or raise on failure.""" # Try the recipe list endpoint as a connection test r = self.session.get( f"{self.api_url}/api/recipe/", params={"limit": 1}, timeout=10, ) r.raise_for_status() # Try to get version from openapi endpoint version = "unknown" try: r2 = self.session.get(f"{self.api_url}/openapi/", timeout=10) if r2.ok: # Parse version from YAML: "version: 1.5.26" m = re.search(r"version:\s*['\"]?([0-9][0-9a-z._-]*)", r2.text) if m: version = m.group(1) except Exception: pass return {"version": version} def list_keywords(self) -> list[dict]: """Return all keywords as [{name, id}].""" results = [] page_url = f"{self.api_url}/api/keyword/" params = {"limit": 100, "format": "json"} while page_url: r = self.session.get(page_url, params=params, timeout=10) if not r.ok: break data = r.json() results.extend({"name": k["name"], "id": k["id"]} for k in data.get("results", [])) page_url = data.get("next") params = {} # next URL already has params return results def list_recipes(self, search: str = "", keyword_ids: list[int] | None = None, page: int = 1, per_page: int = 50) -> dict: """List recipes with optional search and keyword filtering. Returns {"items": [...], "page": int, "per_page": int, "total": int}. """ params: dict = { "limit": per_page, "offset": (page - 1) * per_page, "format": "json", } if search: params["query"] = search if keyword_ids: params["keywords"] = keyword_ids r = self.session.get(f"{self.api_url}/api/recipe/", params=params, timeout=15) r.raise_for_status() data = r.json() items = [] for item in data.get("results", []): items.append({ "id": item["id"], "name": item.get("name", ""), "image": item.get("image") or "", "tags": [k["name"] for k in item.get("keywords", [])], "url": f"{self.base_url}/view/recipe/{item['id']}", }) return { "items": items, "page": page, "per_page": per_page, "total": data.get("count", 0), } def get_recipe(self, recipe_id: int) -> dict: """Fetch a single recipe and return it in the common format.""" r = self.session.get(f"{self.api_url}/api/recipe/{recipe_id}/", timeout=15) r.raise_for_status() data = r.json() # Parse ingredients from all steps ingredients = [] for step in data.get("steps", []): for ing in step.get("ingredients", []): if ing.get("is_header"): ingredients.append({"group": ing.get("note", "")}) continue food_name = "" if ing.get("food"): food_name = ing["food"].get("name", "") unit_name = "" if ing.get("unit"): unit_name = ing["unit"].get("name", "") amount = ing.get("amount", 0) qty_str = "" if amount: qty_str = str(int(amount)) if amount == int(amount) else str(amount) note = ing.get("note", "") if food_name or qty_str or unit_name: ingredients.append({ "quantity": qty_str, "unit": unit_name, "food": food_name, "extra": note, }) # Parse instructions instructions = [] for step in data.get("steps", []): text = step.get("instruction", "").strip() if text: instructions.append(text) # Parse tags tags = [k["name"] for k in data.get("keywords", [])] return { "title": data.get("name", ""), "description": data.get("description", ""), "image_url": data.get("image") or "", "ingredients": ingredients, "instructions": instructions, "tags": tags, "original_url": data.get("source_url", ""), } def update_recipe(self, recipe_id: int, recipe: dict) -> None: """Update an existing recipe from a common-format dict.""" payload = self._build_payload(recipe) r = self.session.put( f"{self.api_url}/api/recipe/{recipe_id}/", json=payload, timeout=15, ) r.raise_for_status() def delete_recipe(self, recipe_id: int) -> None: """Delete a recipe by ID.""" r = self.session.delete(f"{self.api_url}/api/recipe/{recipe_id}/", timeout=10) r.raise_for_status() def find_duplicate(self, url: str, title: str = "") -> dict | None: """Check if a recipe with this source URL already exists.""" if not url and not title: return None search = title or url.split("/")[-1].replace("-", " ") r = self.session.get( f"{self.api_url}/api/recipe/", params={"query": search, "limit": 20}, timeout=10, ) if not r.ok: return None for item in r.json().get("results", []): source = item.get("source_url") or "" desc = item.get("description") or "" if source == url or url in desc: return { "id": item["id"], "name": item["name"], "url": f"{self.base_url}/view/recipe/{item['id']}", } return None def create_recipe(self, recipe: dict) -> dict: """Create a recipe in Tandoor from a scraper result dict. Returns {"id": int, "url": str}. """ payload = self._build_payload(recipe) r = self.session.post( f"{self.api_url}/api/recipe/", json=payload, timeout=15, ) r.raise_for_status() data = r.json() recipe_id = data["id"] # Upload image if available image_url = recipe.get("image_url") if image_url: try: self._upload_image(recipe_id, image_url) except Exception: pass # non-fatal return { "id": recipe_id, "url": f"{self.base_url}/view/recipe/{recipe_id}", } # ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _build_payload(self, recipe: dict) -> dict: description = recipe.get("description", "") original_url = recipe.get("original_url", "") if original_url and original_url not in description: description = f"{description}\n\n{original_url}".strip() # Build ingredients list ingredients = [] order = 0 pending_group = "" for item in recipe.get("ingredients", []): if isinstance(item, dict): if "group" in item and "food" not in item: # Group header ingredients.append({ "food": None, "unit": None, "amount": 0, "note": item["group"], "is_header": True, "no_amount": True, "order": order, }) order += 1 else: ingredients.append(self._build_ingredient(item, order)) order += 1 # Build steps — all ingredients on step 0 instructions = recipe.get("instructions", []) steps = [] for i, text in enumerate(instructions): steps.append({ "instruction": text, "ingredients": ingredients if i == 0 else [], "order": i, }) if not instructions and ingredients: steps.append({ "instruction": "", "ingredients": ingredients, "order": 0, }) # Keywords (tags) tag_names = recipe.get("tags", []) keywords = [{"name": t} for t in tag_names] if tag_names else [] return { "name": recipe["title"], "description": description, "source_url": original_url, "keywords": keywords, "steps": steps, "servings": 1, } def _build_ingredient(self, item: dict, order: int) -> dict: """Build a Tandoor ingredient from a structured dict.""" qty_str = str(item.get("quantity", "")).strip() unit_str = item.get("unit", "").strip() food_str = item.get("food", "").strip() extra = item.get("extra", "").strip() has_structured = bool(qty_str or unit_str) if has_structured and food_str: qty, range_note = _parse_qty(qty_str) if range_note: extra = f"{range_note} {unit_str}; {extra}".strip("; ") if extra else f"{range_note} {unit_str}".strip() return { "food": {"name": food_str}, "unit": {"name": unit_str} if unit_str else None, "amount": qty, "note": extra, "is_header": False, "no_amount": False, "order": order, } else: # No quantity/unit — food-only or note note = food_str if extra: note = f"{note} ({extra})" if note else extra if food_str: return { "food": {"name": food_str}, "unit": None, "amount": 0, "note": extra, "is_header": False, "no_amount": True, "order": order, } else: return { "food": None, "unit": None, "amount": 0, "note": note, "is_header": False, "no_amount": True, "order": order, } def _upload_image(self, recipe_id: int, image_url: str): """Download image from *image_url* and upload it to the recipe.""" img_resp = requests.get(image_url, timeout=30, headers={ "User-Agent": "RecipeImporter/1.0", }) img_resp.raise_for_status() content_type = img_resp.headers.get("Content-Type", "image/jpeg") ext = "jpg" if "png" in content_type: ext = "png" elif "webp" in content_type: ext = "webp" files = { "image": (f"recipe.{ext}", io.BytesIO(img_resp.content), content_type), } r = self.session.put( f"{self.api_url}/api/recipe/{recipe_id}/image/", files=files, timeout=30, ) r.raise_for_status()