added libri too

This commit is contained in:
2026-01-25 20:59:41 +01:00
parent cbfac3c3c4
commit 5aaacbb753
+498 -127
View File
@@ -8,7 +8,7 @@ metadata:
name: calibre-system name: calibre-system
--- ---
# Custom Metadata Providers ConfigMap # Custom Metadata Providers ConfigMap
# Contains moly.hu provider for Hungarian book metadata # Contains Hungarian metadata providers: moly.hu and libri.hu
apiVersion: v1 apiVersion: v1
kind: ConfigMap kind: ConfigMap
metadata: metadata:
@@ -29,8 +29,7 @@ data:
import re import re
import requests import requests
from lxml.html import fromstring from lxml.html import fromstring
from typing import List, Optional from typing import List, Optional, Tuple
from datetime import datetime
from cps.services.Metadata import MetaRecord, MetaSourceInfo, Metadata from cps.services.Metadata import MetaRecord, MetaSourceInfo, Metadata
import cps.logger as logger import cps.logger as logger
@@ -38,6 +37,65 @@ data:
log = logger.create() log = logger.create()
def strip_accents(s: str) -> str:
"""Remove accents from Hungarian text for comparison"""
if not s:
return ""
symbols = "öÖüÜóÓőŐúÚéÉáÁűŰíÍ"
replacements = "oOuUoOoOuUeEaAuUiI"
trans = str.maketrans(symbols, replacements)
return s.translate(trans).lower()
def normalize_title(title: str) -> str:
"""Normalize title for comparison"""
if not title:
return ""
title = re.sub(r'\([^)]*\)', '', title)
title = re.sub(r'\[[^\]]*\]', '', title)
title = re.sub(r'[^\w\s]', ' ', title)
title = re.sub(r'\s+', ' ', title).strip()
return strip_accents(title)
def calculate_relevance(query_title: str, query_author: str,
result_title: str, result_authors: List[str]) -> int:
"""Calculate relevance score (lower is better, 0 is exact match)"""
score = 500
norm_query_title = normalize_title(query_title)
norm_result_title = normalize_title(result_title)
if norm_query_title == norm_result_title:
score -= 300
elif norm_query_title in norm_result_title or norm_result_title in norm_query_title:
score -= 200
elif any(word in norm_result_title for word in norm_query_title.split() if len(word) > 2):
score -= 100
else:
score += 200
if query_author and result_authors:
norm_query_author = strip_accents(query_author)
result_authors_norm = [strip_accents(a) for a in result_authors]
query_parts = norm_query_author.split()
reversed_author = f"{query_parts[-1]} {' '.join(query_parts[:-1])}" if len(query_parts) >= 2 else norm_query_author
for author_norm in result_authors_norm:
if norm_query_author == author_norm or reversed_author == author_norm:
score -= 200
break
elif norm_query_author in author_norm or author_norm in norm_query_author:
score -= 100
break
elif any(part in author_norm for part in query_parts if len(part) > 2):
score -= 50
break
return max(0, score)
class Moly_hu(Metadata): class Moly_hu(Metadata):
__name__ = "Moly.hu" __name__ = "Moly.hu"
__id__ = "moly_hu" __id__ = "moly_hu"
@@ -50,7 +108,6 @@ data:
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'hu-HU,hu;q=0.9,en;q=0.8', 'Accept-Language': 'hu-HU,hu;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
} }
session = requests.Session() session = requests.Session()
@@ -59,34 +116,31 @@ data:
def search( def search(
self, query: str, generic_cover: str = "", locale: str = "hu" self, query: str, generic_cover: str = "", locale: str = "hu"
) -> Optional[List[MetaRecord]]: ) -> Optional[List[MetaRecord]]:
"""Search moly.hu for books matching the query"""
if not self.active: if not self.active:
return [] return []
val = [] val = []
query_author = ""
query_title = query.strip()
try: try:
# Search for books
search_url = self.SEARCH_URL + requests.utils.quote(query) search_url = self.SEARCH_URL + requests.utils.quote(query)
log.info(f"Moly.hu searching: {search_url}") log.info(f"Moly.hu searching: {search_url}")
response = self.session.get(search_url, timeout=15) response = self.session.get(search_url, timeout=15)
response.raise_for_status() response.raise_for_status()
# Parse search results
root = fromstring(response.text) root = fromstring(response.text)
book_links = self._parse_search_results(root, query) book_data = self._parse_search_results(root, query_title, query_author)
if not book_links: if not book_data:
log.info(f"Moly.hu: No results found for '{query}'") log.info(f"Moly.hu: No results found for '{query}'")
return [] return []
# Fetch details for each book (max 5)
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = { futures = {
executor.submit(self._get_book_details, link, idx): idx executor.submit(self._get_book_details, url, idx, query_title, query_author): idx
for idx, link in enumerate(book_links[:5]) for idx, (url, _) in enumerate(book_data[:5])
} }
for future in concurrent.futures.as_completed(futures, timeout=20): for future in concurrent.futures.as_completed(futures, timeout=20):
@@ -107,35 +161,42 @@ data:
log.error_or_exception(f"Moly.hu search error: {e}") log.error_or_exception(f"Moly.hu search error: {e}")
return [] return []
# Sort by relevance (order from search results) val.sort(key=lambda x: getattr(x, '_relevance_score', 500))
val.sort(key=lambda x: x.source.id if hasattr(x, 'source') else 0)
return val return val
def _parse_search_results(self, root, query: str) -> List[str]: def _parse_search_results(self, root, query_title: str, query_author: str) -> List[Tuple[str, int]]:
"""Extract book URLs from search results page""" results = root.xpath('//a[@class="book_selector"]')
results = root.xpath('//a[@class="book_selector"]/@href') book_data = []
book_urls = []
for href in results: for result in results:
if href and href not in book_urls: href = result.get('href')
book_urls.append(self.BASE_URL + href) if not href:
continue
log.info(f"Moly.hu found {len(book_urls)} results") text = result.text_content().strip() if result.text_content() else ""
return book_urls result_author = ""
result_title = text
if ':' in text:
parts = text.split(':', 1)
result_author = parts[0].strip()
result_title = parts[1].strip()
def _get_book_details(self, url: str, index: int) -> Optional[MetaRecord]: relevance = calculate_relevance(query_title, query_author, result_title, [result_author])
"""Fetch and parse book details from a moly.hu book page""" url = self.BASE_URL + href
book_data.append((url, relevance))
book_data.sort(key=lambda x: x[1])
log.info(f"Moly.hu found {len(book_data)} results")
return book_data
def _get_book_details(self, url: str, index: int, query_title: str, query_author: str) -> Optional[MetaRecord]:
try: try:
response = self.session.get(url, timeout=15) response = self.session.get(url, timeout=15)
response.raise_for_status() response.raise_for_status()
# Clean up HTML raw = response.text.replace('<em>', '').replace('</em>', '')
raw = response.text
raw = raw.replace('<em>', '').replace('</em>', '')
root = fromstring(raw) root = fromstring(raw)
# Parse all fields
title = self._parse_title(root) title = self._parse_title(root)
authors = self._parse_authors(root) authors = self._parse_authors(root)
@@ -157,7 +218,8 @@ data:
identifiers={"moly_hu": moly_id}, identifiers={"moly_hu": moly_id},
) )
# Optional fields match._relevance_score = calculate_relevance(query_title, query_author, title, authors)
match.description = self._parse_description(root) match.description = self._parse_description(root)
match.cover = self._parse_cover(root) match.cover = self._parse_cover(root)
match.publisher = self._parse_publisher(root) match.publisher = self._parse_publisher(root)
@@ -165,7 +227,6 @@ data:
match.rating = self._parse_rating(root) match.rating = self._parse_rating(root)
match.tags = self._parse_tags(root) match.tags = self._parse_tags(root)
# Series info
series_info = self._parse_series(root) series_info = self._parse_series(root)
if series_info: if series_info:
match.series = series_info[0] match.series = series_info[0]
@@ -174,7 +235,6 @@ data:
except (ValueError, IndexError): except (ValueError, IndexError):
match.series_index = 1 match.series_index = 1
# ISBN
isbn = self._parse_isbn(root) isbn = self._parse_isbn(root)
if isbn: if isbn:
match.identifiers["isbn"] = isbn match.identifiers["isbn"] = isbn
@@ -186,7 +246,6 @@ data:
return None return None
def _parse_moly_id(self, url: str) -> Optional[str]: def _parse_moly_id(self, url: str) -> Optional[str]:
"""Extract moly.hu book ID from URL"""
try: try:
m = re.search(r'/konyvek/(.*)', url) m = re.search(r'/konyvek/(.*)', url)
if m: if m:
@@ -196,7 +255,6 @@ data:
return None return None
def _parse_title(self, root) -> Optional[str]: def _parse_title(self, root) -> Optional[str]:
"""Parse book title"""
title_node = root.xpath('//*[@id="content"]//*[@class="fn"]/text()') title_node = root.xpath('//*[@id="content"]//*[@class="fn"]/text()')
if not title_node: if not title_node:
title_node = root.xpath('//*[@id="content"]//*[@class="item"]/text()') title_node = root.xpath('//*[@id="content"]//*[@class="item"]/text()')
@@ -205,26 +263,19 @@ data:
return None return None
def _parse_authors(self, root) -> List[str]: def _parse_authors(self, root) -> List[str]:
"""Parse author names"""
author_nodes = root.xpath('//*[@id="content"]//div[@class="authors"]/a/text()') author_nodes = root.xpath('//*[@id="content"]//div[@class="authors"]/a/text()')
if author_nodes: if author_nodes:
return [str(author).strip() for author in author_nodes] return [str(author).strip() for author in author_nodes]
return [] return []
def _parse_description(self, root) -> Optional[str]: def _parse_description(self, root) -> Optional[str]:
"""Parse book description/comments""" description_node = root.xpath('//*[@id="content"]//*[@class="text" and @id="full_description"]/p/text()')
description_node = root.xpath(
'//*[@id="content"]//*[@class="text" and @id="full_description"]/p/text()'
)
if not description_node: if not description_node:
description_node = root.xpath('//*[@id="content"]//*[@class="text"]/p/text()') description_node = root.xpath('//*[@id="content"]//*[@class="text"]/p/text()')
if not description_node: if not description_node:
description_node = root.xpath( description_node = root.xpath('//*[@id="content"]//*[@class="text shrinkable"]/p/text()')
'//*[@id="content"]//*[@class="text shrinkable"]/p/text()'
)
if description_node: if description_node:
# Clean up description
desc = '\n'.join(description_node) desc = '\n'.join(description_node)
desc = desc.replace('\n\n', '\n').replace('\n \n', '\n') desc = desc.replace('\n\n', '\n').replace('\n \n', '\n')
desc = desc.replace('Vigyázat! Cselekményleírást tartalmaz.\n', '') desc = desc.replace('Vigyázat! Cselekményleírást tartalmaz.\n', '')
@@ -232,7 +283,6 @@ data:
return None return None
def _parse_cover(self, root) -> Optional[str]: def _parse_cover(self, root) -> Optional[str]:
"""Parse cover image URL"""
cover_nodes = root.xpath('(//*[@class="coverbox"]//a/@href)[1]') cover_nodes = root.xpath('(//*[@class="coverbox"]//a/@href)[1]')
if cover_nodes: if cover_nodes:
cover_url = cover_nodes[0] cover_url = cover_nodes[0]
@@ -240,7 +290,6 @@ data:
cover_url = self.BASE_URL + cover_url cover_url = self.BASE_URL + cover_url
return cover_url return cover_url
# Fallback: try img src directly
img_nodes = root.xpath('//*[@class="coverbox"]//img/@src') img_nodes = root.xpath('//*[@class="coverbox"]//img/@src')
if img_nodes: if img_nodes:
img_url = img_nodes[0] img_url = img_nodes[0]
@@ -250,14 +299,9 @@ data:
return None return None
def _parse_publisher(self, root) -> Optional[str]: def _parse_publisher(self, root) -> Optional[str]:
"""Parse publisher name""" publisher_node_1 = root.xpath('//*[@id="content"]//*[@class="items"]/div/div[1]/a/text()')
publisher_node_1 = root.xpath(
'//*[@id="content"]//*[@class="items"]/div/div[1]/a/text()'
)
if publisher_node_1 and publisher_node_1[0] == '+': if publisher_node_1 and publisher_node_1[0] == '+':
publisher_node = root.xpath( publisher_node = root.xpath('//*[@id="content"]//*[@class="items"]/div/div[2]/a/text()')
'//*[@id="content"]//*[@class="items"]/div/div[2]/a/text()'
)
else: else:
publisher_node = publisher_node_1 publisher_node = publisher_node_1
@@ -266,14 +310,9 @@ data:
return None return None
def _parse_published_date(self, root) -> Optional[str]: def _parse_published_date(self, root) -> Optional[str]:
"""Parse publication date (year)""" publication_node_1 = root.xpath('//*[@id="content"]//*[@class="items"]/div/div[1]/text()')
publication_node_1 = root.xpath(
'//*[@id="content"]//*[@class="items"]/div/div[1]/text()'
)
if not publication_node_1: if not publication_node_1:
publication_node = root.xpath( publication_node = root.xpath('//*[@id="content"]//*[@class="items"]/div/div[2]/text()')
'//*[@id="content"]//*[@class="items"]/div/div[2]/text()'
)
else: else:
publication_node = publication_node_1 publication_node = publication_node_1
@@ -284,13 +323,9 @@ data:
return None return None
def _parse_rating(self, root) -> int: def _parse_rating(self, root) -> int:
"""Parse rating (converted to 0-5 scale)""" rating_node = root.xpath('//*[@id="content"]//*[@class="rating"]//*[@class="like_count"]/text()')
rating_node = root.xpath(
'//*[@id="content"]//*[@class="rating"]//*[@class="like_count"]/text()'
)
if rating_node: if rating_node:
try: try:
# Moly.hu uses percentage, convert to 0-5 scale
percentage = float(rating_node[0].strip('%').strip()) percentage = float(rating_node[0].strip('%').strip())
return round(percentage * 0.05) return round(percentage * 0.05)
except (ValueError, IndexError): except (ValueError, IndexError):
@@ -298,19 +333,15 @@ data:
return 0 return 0
def _parse_tags(self, root) -> List[str]: def _parse_tags(self, root) -> List[str]:
"""Parse tags/genres"""
# Genre tags (in brackets)
tags_genre = root.xpath('//*[@id="book_tags"]//*[@class="tag genre"]/text()') tags_genre = root.xpath('//*[@id="book_tags"]//*[@class="tag genre"]/text()')
tags_genre = [f"[{str(t).strip()}]" for t in tags_genre if str(t).strip()] tags_genre = [f"[{str(t).strip()}]" for t in tags_genre if str(t).strip()]
# Regular tags
tags_regular = root.xpath('//*[@id="book_tags"]//*[@class="tag"]/text()') tags_regular = root.xpath('//*[@id="book_tags"]//*[@class="tag"]/text()')
tags_regular = [str(t).strip() for t in tags_regular if str(t).strip()] tags_regular = [str(t).strip() for t in tags_regular if str(t).strip()]
return tags_genre + tags_regular return tags_genre + tags_regular
def _parse_series(self, root) -> Optional[List[str]]: def _parse_series(self, root) -> Optional[List[str]]:
"""Parse series name and index"""
series_node = root.xpath('//*[@id="content"]//*[@class="action"]/text()') series_node = root.xpath('//*[@id="content"]//*[@class="action"]/text()')
if not series_node: if not series_node:
@@ -319,7 +350,6 @@ data:
series_text = series_node[0].strip('().') series_text = series_node[0].strip('().')
parts = series_text.rsplit(' ', 1) parts = series_text.rsplit(' ', 1)
# Check if it's actually edition info, not series
if len(parts) > 1 and parts[1] == 'kiadás': if len(parts) > 1 and parts[1] == 'kiadás':
return None return None
@@ -331,26 +361,406 @@ data:
return None return None
def _parse_isbn(self, root) -> Optional[str]: def _parse_isbn(self, root) -> Optional[str]:
"""Parse ISBN""" isbn_nodes = root.xpath('//*[@id="content"]//*[@class="items"]/div/div[2]/text()')
# Try first location
isbn_nodes = root.xpath(
'//*[@id="content"]//*[@class="items"]/div/div[2]/text()'
)
for value in isbn_nodes: for value in isbn_nodes:
m = re.search(r'(\d{13}|\d{10})', value) m = re.search(r'(\d{13}|\d{10})', value)
if m: if m:
return m.group(1) return m.group(1)
# Try second location isbn_nodes = root.xpath('//*[@id="content"]//*[@class="items"]/div/div[3]/text()')
isbn_nodes = root.xpath(
'//*[@id="content"]//*[@class="items"]/div/div[3]/text()'
)
for value in isbn_nodes: for value in isbn_nodes:
m = re.search(r'(\d{13}|\d{10})', value) m = re.search(r'(\d{13}|\d{10})', value)
if m: if m:
return m.group(1) return m.group(1)
return None return None
libri_hu.py: |
# -*- coding: utf-8 -*-
# Calibre-Web Automated - Libri.hu Metadata Provider
# Based on Calibre plugin by Hoffer Csaba, Kloon & Hokutya
# Adapted for CWA
# SPDX-License-Identifier: GPL-3.0-or-later
import concurrent.futures
import re
import requests
from lxml.html import fromstring, tostring
from lxml import html as lh
from typing import List, Optional, Tuple, Dict
from cps.services.Metadata import MetaRecord, MetaSourceInfo, Metadata
import cps.logger as logger
log = logger.create()
def strip_accents(s: str) -> str:
"""Remove accents from Hungarian text for comparison"""
if not s:
return ""
symbols = "öÖüÜóÓőŐúÚéÉáÁűŰíÍąĄćĆęĘłŁńŃśŚźŹżŻ"
replacements = "oOuUoOoOuUeEaAuUiIaAcCeElLnNsSzZzZ"
trans = str.maketrans(symbols, replacements)
return s.translate(trans).lower()
def normalize_title(title: str) -> str:
"""Normalize title for comparison"""
if not title:
return ""
title = re.sub(r'\([^)]*\)', '', title)
title = re.sub(r'\[[^\]]*\]', '', title)
title = re.sub(r'[^\w\s]', ' ', title)
title = re.sub(r'\s+', ' ', title).strip()
return strip_accents(title)
def calculate_relevance(query_title: str, query_author: str,
result_title: str, result_authors: List[str]) -> int:
"""Calculate relevance score (lower is better, 0 is exact match)"""
score = 500
norm_query_title = normalize_title(query_title)
norm_result_title = normalize_title(result_title)
if norm_query_title == norm_result_title:
score -= 300
elif norm_query_title in norm_result_title or norm_result_title in norm_query_title:
score -= 200
elif any(word in norm_result_title for word in norm_query_title.split() if len(word) > 2):
score -= 100
else:
score += 200
if query_author and result_authors:
norm_query_author = strip_accents(query_author)
result_authors_norm = [strip_accents(a) for a in result_authors]
query_parts = norm_query_author.split()
reversed_author = f"{query_parts[-1]} {' '.join(query_parts[:-1])}" if len(query_parts) >= 2 else norm_query_author
for author_norm in result_authors_norm:
if norm_query_author == author_norm or reversed_author == author_norm:
score -= 200
break
elif norm_query_author in author_norm or author_norm in norm_query_author:
score -= 100
break
elif any(part in author_norm for part in query_parts if len(part) > 2):
score -= 50
break
return max(0, score)
class Libri_hu(Metadata):
__name__ = "Libri.hu"
__id__ = "libri_hu"
BASE_URL = "https://www.libri.hu"
BOOK_URL = BASE_URL + "/konyv"
SEARCH_URL = BASE_URL + "/talalati-lista"
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'hu-HU,hu;q=0.9,en;q=0.8',
}
session = requests.Session()
session.headers.update(headers)
def search(
self, query: str, generic_cover: str = "", locale: str = "hu"
) -> Optional[List[MetaRecord]]:
if not self.active:
return []
val = []
query_author = ""
query_title = query.strip()
try:
# Libri.hu search URL format
search_url = f"{self.SEARCH_URL}?kereses={requests.utils.quote(query)}"
log.info(f"Libri.hu searching: {search_url}")
response = self.session.get(search_url, timeout=15)
response.raise_for_status()
root = fromstring(response.text)
book_data = self._parse_search_results(root, query_title, query_author)
if not book_data:
log.info(f"Libri.hu: No results found for '{query}'")
return []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(self._get_book_details, url, idx, query_title, query_author): idx
for idx, (url, _) in enumerate(book_data[:5])
}
for future in concurrent.futures.as_completed(futures, timeout=20):
try:
result = future.result()
if result:
val.append(result)
except Exception as e:
log.warning(f"Libri.hu worker error: {e}")
except requests.exceptions.Timeout:
log.warning("Libri.hu search timed out")
return []
except requests.exceptions.HTTPError as e:
log.error(f"Libri.hu HTTP error: {e}")
return []
except Exception as e:
log.error_or_exception(f"Libri.hu search error: {e}")
return []
val.sort(key=lambda x: getattr(x, '_relevance_score', 500))
return val
def _parse_search_results(self, root, query_title: str, query_author: str) -> List[Tuple[str, int]]:
"""Parse search results page"""
book_data = []
# Try multiple possible XPath selectors for Libri's search results
book_links = root.xpath('//*[@id="book-list-result-items"]//h4[@class="book"]/a/@href')
if not book_links:
# Alternative selector
book_links = root.xpath('//a[contains(@href, "/konyv/")]/@href')
# Filter to unique book URLs
book_links = list(set([l for l in book_links if '/konyv/' in l and '.html' in l]))
for href in book_links[:10]: # Limit to 10 results
if not href.startswith('http'):
url = self.BASE_URL + href
else:
url = href
# Can't calculate preliminary relevance without title info from search page
# so use index-based scoring
book_data.append((url, len(book_data) * 10))
log.info(f"Libri.hu found {len(book_data)} results")
return book_data
def _get_book_details(self, url: str, index: int, query_title: str, query_author: str) -> Optional[MetaRecord]:
try:
response = self.session.get(url, timeout=15)
response.raise_for_status()
# Parse with lxml
root = lh.document_fromstring(response.content.decode('utf-8', errors='replace'))
# Parse book properties table
book_props = self._parse_book_properties(root)
title = self._parse_title(root)
authors = self._parse_authors(root)
if not title:
return None
libri_id = self._parse_libri_id(url)
match = MetaRecord(
id=libri_id,
title=title,
authors=authors if authors else [""],
source=MetaSourceInfo(
id=self.__id__,
description="Libri.hu - Könyvesbolt",
link=self.BASE_URL
),
url=url,
identifiers={"libri_hu": libri_id},
)
match._relevance_score = calculate_relevance(query_title, query_author, title, authors)
# ISBN
isbn = book_props.get('ISBN', '').strip()
if isbn:
match.identifiers["isbn"] = isbn
# Publisher
publisher = book_props.get('Kiadó', '').strip()
if publisher:
match.publisher = publisher
# Publication date
pub_year = book_props.get('Kiadás éve', '').strip()
if pub_year:
match.publishedDate = pub_year
# Series
series = book_props.get('Sorozat', '').strip()
if series:
match.series = series
# Language
lang = book_props.get('Nyelv', '').strip().lower()
if lang:
match.languages = [self._translate_language(lang)]
# Description
match.description = self._parse_description(root)
# Cover
match.cover = self._parse_cover(root)
# Rating
match.rating = self._parse_rating(root)
# Tags from breadcrumbs
match.tags = self._parse_tags(root)
return match
except Exception as e:
log.warning(f"Libri.hu error fetching {url}: {e}")
return None
def _parse_book_properties(self, root) -> Dict[str, str]:
"""Parse the book properties table"""
book_properties = {}
# Try to find the properties table
tables = root.xpath('//*[@id="productPageMainItem"]//table')
if not tables:
tables = root.xpath('//table[contains(@class, "product")]')
for table in tables:
for row in table.findall('.//tr'):
cells = row.findall('.//th') + row.findall('.//td')
if len(cells) >= 2:
key = cells[0].text_content().strip().rstrip(':')
value = cells[1].text_content().strip()
if key and value:
book_properties[key] = value
return book_properties
def _parse_libri_id(self, url: str) -> Optional[str]:
try:
m = re.search(r'/konyv/(.*)\.html', url)
if m:
return m.group(1)
m = re.search(r'/konyv/([^/]+)', url)
if m:
return m.group(1)
except:
pass
return None
def _parse_title(self, root) -> Optional[str]:
# Try multiple selectors
selectors = [
'//*[@id="productPageMainItem"]//*[@class="h2 mb-2"]/text()',
'//*[@id="productPageMainItem"]//h1/text()',
'//h1[@class="book-title"]/text()',
'//meta[@property="og:title"]/@content',
]
for selector in selectors:
nodes = root.xpath(selector)
if nodes:
title = nodes[0].strip()
if title:
# Check for subtitle
subtitle_nodes = root.xpath('//*[@id="productPageMainItem"]//*[@class="subtitle"]/text()')
if subtitle_nodes:
title = f"{title} {subtitle_nodes[0].strip()}"
return title
return None
def _parse_authors(self, root) -> List[str]:
selectors = [
'//*[@id="productPageMainItem"]/div/div/div[2]/p[1]/a/text()',
'//*[@id="productPageMainItem"]//a[contains(@href, "/szerzo/")]/text()',
'//a[@class="author"]/text()',
]
for selector in selectors:
nodes = root.xpath(selector)
if nodes:
authors = [str(a).strip().replace('-', '') for a in nodes if str(a).strip()]
if authors:
return authors
return []
def _parse_description(self, root) -> Optional[str]:
selectors = [
'//*[@id="product-description"]',
'//*[@class="description"]',
'//*[@itemprop="description"]',
]
for selector in selectors:
nodes = root.xpath(selector)
if nodes:
text = nodes[0].text_content().strip()
if text:
return text
return None
def _parse_cover(self, root) -> Optional[str]:
selectors = [
'//*[@property="og:image"]/@content',
'//*[@class="cover"]//img/@src',
'//*[@id="productPageMainItem"]//img/@src',
]
for selector in selectors:
nodes = root.xpath(selector)
if nodes:
url = nodes[0].strip()
if url:
if not url.startswith('http'):
url = self.BASE_URL + url
return url
return None
def _parse_rating(self, root) -> int:
nodes = root.xpath('//*[@id="productPageMainItem"]//*[@itemprop="ratingValue"]/@content')
if nodes:
try:
rating = float(nodes[0].strip())
return round(rating)
except:
pass
return 0
def _parse_tags(self, root) -> List[str]:
nodes = root.xpath('//*[@id="navigationBar"]//text()')
if nodes:
tags = [tag.strip().lower() for tag in nodes if tag.strip()]
# Filter out navigation elements
tags = [t for t in tags if t and t not in ['>', '/', 'főoldal', 'home']]
return tags
return []
def _translate_language(self, lang: str) -> str:
lang_map = {
'magyar': 'hu',
'angol': 'en',
'amerikai': 'en',
'német': 'de',
'francia': 'fr',
'olasz': 'it',
'spanyol': 'es',
'orosz': 'ru',
'török': 'tr',
'görög': 'el',
'kínai': 'zh',
'japán': 'ja',
}
return lang_map.get(lang.lower(), 'hu')
--- ---
# Calibre-Web-Automated Deployment # Calibre-Web-Automated Deployment
apiVersion: apps/v1 apiVersion: apps/v1
@@ -377,7 +787,7 @@ spec:
annotations: annotations:
# Version checker pattern - CWA uses semantic versioning # Version checker pattern - CWA uses semantic versioning
match-regex.version-checker.io/calibre-web-automated: '^V?[0-9]+\.[0-9]+\.[0-9]+$' match-regex.version-checker.io/calibre-web-automated: '^V?[0-9]+\.[0-9]+\.[0-9]+$'
# Force rollout when ConfigMap changes (update this hash when modifying providers) # Force rollout when ConfigMap changes
configmap.reloader.stakater.com/reload: "calibre-custom-metadata-providers" configmap.reloader.stakater.com/reload: "calibre-custom-metadata-providers"
spec: spec:
containers: containers:
@@ -391,13 +801,10 @@ spec:
value: "1000" value: "1000"
- name: TZ - name: TZ
value: Europe/Budapest value: Europe/Budapest
# Use default port 8083
- name: CWA_PORT_OVERRIDE - name: CWA_PORT_OVERRIDE
value: "8083" value: "8083"
# Disable WAL mode if on network share (set to true if using NFS)
- name: NETWORK_SHARE_MODE - name: NETWORK_SHARE_MODE
value: "false" value: "false"
# Number of proxies in chain (Cloudflare -> nginx-ingress -> app)
- name: TRUSTED_PROXY_COUNT - name: TRUSTED_PROXY_COUNT
value: "2" value: "2"
ports: ports:
@@ -433,38 +840,35 @@ spec:
port: http port: http
periodSeconds: 10 periodSeconds: 10
timeoutSeconds: 5 timeoutSeconds: 5
# CWA can take time to initialize, especially first run
failureThreshold: 60 failureThreshold: 60
volumeMounts: volumeMounts:
# Config directory for app database, logs, processed books backup
- name: config - name: config
mountPath: /config mountPath: /config
# Book ingest folder - files here are DELETED after processing
- name: ingest - name: ingest
mountPath: /cwa-book-ingest mountPath: /cwa-book-ingest
# Calibre library - your existing library location
- name: library - name: library
mountPath: /calibre-library mountPath: /calibre-library
# Custom metadata providers (moly.hu) # Hungarian metadata providers
- name: custom-metadata-providers - name: custom-metadata-providers
mountPath: /app/calibre-web-automated/cps/metadata_provider/moly_hu.py mountPath: /app/calibre-web-automated/cps/metadata_provider/moly_hu.py
subPath: moly_hu.py subPath: moly_hu.py
readOnly: true readOnly: true
- name: custom-metadata-providers
mountPath: /app/calibre-web-automated/cps/metadata_provider/libri_hu.py
subPath: libri_hu.py
readOnly: true
volumes: volumes:
- name: config - name: config
persistentVolumeClaim: persistentVolumeClaim:
claimName: calibre-web-automated-config claimName: calibre-web-automated-config
# Ingest folder on hostPath for easy file dropping
- name: ingest - name: ingest
hostPath: hostPath:
path: /mnt/4_hdd/data/calibre-ingest path: /mnt/4_hdd/data/calibre-ingest
type: DirectoryOrCreate type: DirectoryOrCreate
# Your existing Calibre library location
- name: library - name: library
hostPath: hostPath:
path: /mnt/4_hdd/data/calibre path: /mnt/4_hdd/data/calibre
type: DirectoryOrCreate type: DirectoryOrCreate
# Custom metadata providers from ConfigMap
- name: custom-metadata-providers - name: custom-metadata-providers
configMap: configMap:
name: calibre-custom-metadata-providers name: calibre-custom-metadata-providers
@@ -489,7 +893,7 @@ spec:
app.kubernetes.io/instance: calibre app.kubernetes.io/instance: calibre
app.kubernetes.io/name: calibre-web-automated app.kubernetes.io/name: calibre-web-automated
--- ---
# Main Ingress (books.dooplex.hu - primary reading interface) # Main Ingress (books.dooplex.hu)
apiVersion: networking.k8s.io/v1 apiVersion: networking.k8s.io/v1
kind: Ingress kind: Ingress
metadata: metadata:
@@ -505,7 +909,6 @@ metadata:
nginx.ingress.kubernetes.io/proxy-read-timeout: "600" nginx.ingress.kubernetes.io/proxy-read-timeout: "600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "600" nginx.ingress.kubernetes.io/proxy-send-timeout: "600"
nginx.ingress.kubernetes.io/ssl-redirect: "true" nginx.ingress.kubernetes.io/ssl-redirect: "true"
# Forward auth headers for Authentik integration
nginx.ingress.kubernetes.io/auth-response-headers: Set-Cookie,X-authentik-username,X-authentik-groups,X-authentik-email,X-authentik-name,X-authentik-uid nginx.ingress.kubernetes.io/auth-response-headers: Set-Cookie,X-authentik-username,X-authentik-groups,X-authentik-email,X-authentik-name,X-authentik-uid
nginx.ingress.kubernetes.io/auth-snippet: proxy_set_header X-Forwarded-Host $http_host; nginx.ingress.kubernetes.io/auth-snippet: proxy_set_header X-Forwarded-Host $http_host;
nginx.ingress.kubernetes.io/configuration-snippet: | nginx.ingress.kubernetes.io/configuration-snippet: |
@@ -544,7 +947,7 @@ spec:
port: port:
number: 8083 number: 8083
--- ---
# Config PVC - stores app.db, logs, processed_books backup # Config PVC
apiVersion: v1 apiVersion: v1
kind: PersistentVolumeClaim kind: PersistentVolumeClaim
metadata: metadata:
@@ -561,36 +964,4 @@ spec:
storageClassName: longhorn storageClassName: longhorn
resources: resources:
requests: requests:
# Larger than typical - stores backup of processed books by default
storage: 10Gi storage: 10Gi
---
# Optional: Authentik integration for SSO
# Uncomment and configure if using Authentik proxy authentication
# apiVersion: networking.k8s.io/v1
# kind: Ingress
# metadata:
# name: calibre-web-automated-auth
# namespace: calibre-system
# annotations:
# cert-manager.io/cluster-issuer: letsencrypt-prod
# nginx.ingress.kubernetes.io/auth-url: http://authentik-outpost-proxy.authentik-system.svc.cluster.local:9000/outpost.goauthentik.io/auth/nginx
# nginx.ingress.kubernetes.io/auth-signin: https://auth.dooplex.hu/outpost.goauthentik.io/start?rd=$escaped_request_uri
# nginx.ingress.kubernetes.io/auth-response-headers: Set-Cookie,X-authentik-username,X-authentik-groups,X-authentik-email,X-authentik-name,X-authentik-uid
# nginx.ingress.kubernetes.io/auth-snippet: proxy_set_header X-Forwarded-Host $http_host;
# spec:
# ingressClassName: nginx-internal
# tls:
# - hosts:
# - books.dooplex.hu
# secretName: calibre-web-automated-tls
# rules:
# - host: books.dooplex.hu
# http:
# paths:
# - path: /
# pathType: Prefix
# backend:
# service:
# name: calibre-web-automated
# port:
# number: 8083