d2dd837f26
A Python CLI tool for generating financial reports from Paperless-ngx: - Phase 1 (MVP): Config handling, Paperless API client with auth and pagination, custom fields extraction, tag-based summation, CLI output - Phase 2 (Grouping): Multiple grouping criteria (tag, correspondent, category, payment type, month, quarter, year), percentage distribution - Phase 3 (Reports): HTML reports with Chart.js diagrams (doughnut, bar, line charts), PDF export via WeasyPrint, JSON and CSV export - Phase 4 (Comfort): Automatic tag ID resolution, disk caching with diskcache, colorized logging, comprehensive error handling Features: - Flexible date filtering (year, month, date range) - Period comparison with change analysis - Swiss franc formatting (CHF with apostrophe separators) - Interactive HTML reports with sortable tables and document links - Multiple output formats (CLI, HTML, PDF, JSON, CSV)
538 lines
17 KiB
Python
538 lines
17 KiB
Python
"""
|
|
Paperless-ngx API Client.
|
|
|
|
Handhabt die Kommunikation mit der Paperless REST-API inkl. Paginierung und Caching.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Generator, List, Optional, Union
|
|
from urllib.parse import urlencode, urljoin
|
|
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
from config import Config, get_config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PaperlessAPIError(Exception):
|
|
"""Fehler bei der API-Kommunikation."""
|
|
|
|
def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[dict] = None):
|
|
super().__init__(message)
|
|
self.status_code = status_code
|
|
self.response = response
|
|
|
|
|
|
class PaperlessClient:
|
|
"""Client für die Paperless-ngx REST-API."""
|
|
|
|
# API-Endpunkte
|
|
ENDPOINTS = {
|
|
'documents': '/api/documents/',
|
|
'tags': '/api/tags/',
|
|
'correspondents': '/api/correspondents/',
|
|
'document_types': '/api/document_types/',
|
|
'custom_fields': '/api/custom_fields/',
|
|
'storage_paths': '/api/storage_paths/',
|
|
}
|
|
|
|
def __init__(self, config: Optional[Config] = None, cache: Optional[Any] = None):
|
|
"""
|
|
Initialisiert den API-Client.
|
|
|
|
Args:
|
|
config: Konfigurationsobjekt. Falls None, wird globale Config verwendet.
|
|
cache: Optionales Cache-Objekt (diskcache.Cache)
|
|
"""
|
|
self.config = config or get_config()
|
|
self.base_url = self.config.paperless_url
|
|
self.token = self.config.paperless_token
|
|
self.timeout = self.config.timeout
|
|
self.cache = cache
|
|
|
|
# Session mit Retry-Logik erstellen
|
|
self.session = self._create_session()
|
|
|
|
# Cached Metadata
|
|
self._custom_fields_cache: Optional[Dict[int, dict]] = None
|
|
self._tags_cache: Optional[Dict[int, dict]] = None
|
|
self._correspondents_cache: Optional[Dict[int, dict]] = None
|
|
self._document_types_cache: Optional[Dict[int, dict]] = None
|
|
|
|
def _create_session(self) -> requests.Session:
|
|
"""Erstellt eine Session mit Retry-Konfiguration."""
|
|
session = requests.Session()
|
|
|
|
# Retry-Strategie
|
|
retry_strategy = Retry(
|
|
total=3,
|
|
backoff_factor=1,
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
)
|
|
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
session.mount('http://', adapter)
|
|
session.mount('https://', adapter)
|
|
|
|
# Standard-Header
|
|
session.headers.update({
|
|
'Authorization': f'Token {self.token}',
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'application/json',
|
|
})
|
|
|
|
return session
|
|
|
|
def _get_cache_key(self, endpoint: str, params: Optional[dict] = None) -> str:
|
|
"""Generiert einen Cache-Schlüssel."""
|
|
key_data = f"{self.base_url}{endpoint}"
|
|
if params:
|
|
key_data += json.dumps(params, sort_keys=True)
|
|
return hashlib.md5(key_data.encode()).hexdigest()
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
params: Optional[dict] = None,
|
|
data: Optional[dict] = None,
|
|
use_cache: bool = True
|
|
) -> dict:
|
|
"""
|
|
Führt einen API-Request durch.
|
|
|
|
Args:
|
|
method: HTTP-Methode (GET, POST, etc.)
|
|
endpoint: API-Endpunkt (relativ zur Base-URL)
|
|
params: Query-Parameter
|
|
data: Request-Body
|
|
use_cache: Cache verwenden (nur für GET)
|
|
|
|
Returns:
|
|
API-Response als Dictionary
|
|
"""
|
|
url = urljoin(self.base_url, endpoint)
|
|
|
|
# Cache prüfen (nur GET-Requests)
|
|
if method.upper() == 'GET' and use_cache and self.cache:
|
|
cache_key = self._get_cache_key(endpoint, params)
|
|
cached = self.cache.get(cache_key)
|
|
if cached is not None:
|
|
logger.debug(f"Cache hit für {endpoint}")
|
|
return cached
|
|
|
|
logger.debug(f"API Request: {method} {url} params={params}")
|
|
|
|
try:
|
|
response = self.session.request(
|
|
method=method,
|
|
url=url,
|
|
params=params,
|
|
json=data,
|
|
timeout=self.timeout
|
|
)
|
|
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
# In Cache speichern (nur GET)
|
|
if method.upper() == 'GET' and use_cache and self.cache:
|
|
self.cache.set(cache_key, result, expire=self.config.cache_ttl)
|
|
|
|
return result
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
error_msg = f"HTTP-Fehler: {e}"
|
|
try:
|
|
error_detail = e.response.json()
|
|
error_msg = f"{error_msg} - {error_detail}"
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
|
|
raise PaperlessAPIError(
|
|
error_msg,
|
|
status_code=e.response.status_code if e.response else None
|
|
)
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
raise PaperlessAPIError(f"Verbindungsfehler: Kann {self.base_url} nicht erreichen")
|
|
|
|
except requests.exceptions.Timeout as e:
|
|
raise PaperlessAPIError(f"Timeout nach {self.timeout}s")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
raise PaperlessAPIError(f"Request-Fehler: {e}")
|
|
|
|
def _get_paginated(
|
|
self,
|
|
endpoint: str,
|
|
params: Optional[dict] = None,
|
|
page_size: int = 100
|
|
) -> Generator[dict, None, None]:
|
|
"""
|
|
Holt alle Seiten eines paginierten Endpunkts.
|
|
|
|
Args:
|
|
endpoint: API-Endpunkt
|
|
params: Zusätzliche Query-Parameter
|
|
page_size: Anzahl Ergebnisse pro Seite
|
|
|
|
Yields:
|
|
Einzelne Ergebnis-Objekte
|
|
"""
|
|
params = params or {}
|
|
params['page_size'] = page_size
|
|
page = 1
|
|
|
|
while True:
|
|
params['page'] = page
|
|
logger.debug(f"Lade Seite {page} von {endpoint}")
|
|
|
|
response = self._request('GET', endpoint, params=params)
|
|
|
|
results = response.get('results', [])
|
|
for item in results:
|
|
yield item
|
|
|
|
# Prüfen ob weitere Seiten existieren
|
|
if not response.get('next'):
|
|
break
|
|
|
|
page += 1
|
|
|
|
def test_connection(self) -> bool:
|
|
"""
|
|
Testet die Verbindung zur Paperless-API.
|
|
|
|
Returns:
|
|
True wenn Verbindung erfolgreich
|
|
"""
|
|
try:
|
|
self._request('GET', self.ENDPOINTS['tags'], params={'page_size': 1})
|
|
return True
|
|
except PaperlessAPIError:
|
|
return False
|
|
|
|
# ==================== Custom Fields ====================
|
|
|
|
def get_custom_fields(self, refresh: bool = False) -> Dict[int, dict]:
|
|
"""
|
|
Holt alle Custom Field Definitionen.
|
|
|
|
Args:
|
|
refresh: Cache ignorieren und neu laden
|
|
|
|
Returns:
|
|
Dictionary mit Field-ID als Key und Definition als Value
|
|
"""
|
|
if self._custom_fields_cache is not None and not refresh:
|
|
return self._custom_fields_cache
|
|
|
|
fields = {}
|
|
for field in self._get_paginated(self.ENDPOINTS['custom_fields']):
|
|
fields[field['id']] = field
|
|
|
|
self._custom_fields_cache = fields
|
|
logger.info(f"Geladen: {len(fields)} Custom Fields")
|
|
return fields
|
|
|
|
def get_custom_field_by_name(self, name: str) -> Optional[dict]:
|
|
"""
|
|
Findet ein Custom Field anhand des Namens.
|
|
|
|
Args:
|
|
name: Name des Custom Fields
|
|
|
|
Returns:
|
|
Field-Definition oder None
|
|
"""
|
|
fields = self.get_custom_fields()
|
|
for field in fields.values():
|
|
if field['name'].lower() == name.lower():
|
|
return field
|
|
return None
|
|
|
|
# ==================== Tags ====================
|
|
|
|
def get_tags(self, refresh: bool = False) -> Dict[int, dict]:
|
|
"""
|
|
Holt alle Tags.
|
|
|
|
Returns:
|
|
Dictionary mit Tag-ID als Key
|
|
"""
|
|
if self._tags_cache is not None and not refresh:
|
|
return self._tags_cache
|
|
|
|
tags = {}
|
|
for tag in self._get_paginated(self.ENDPOINTS['tags']):
|
|
tags[tag['id']] = tag
|
|
|
|
self._tags_cache = tags
|
|
logger.info(f"Geladen: {len(tags)} Tags")
|
|
return tags
|
|
|
|
def get_tag_by_name(self, name: str) -> Optional[dict]:
|
|
"""Findet einen Tag anhand des Namens."""
|
|
tags = self.get_tags()
|
|
for tag in tags.values():
|
|
if tag['name'].lower() == name.lower():
|
|
return tag
|
|
return None
|
|
|
|
def get_tag_id(self, name: str) -> Optional[int]:
|
|
"""Holt die ID eines Tags anhand des Namens."""
|
|
tag = self.get_tag_by_name(name)
|
|
return tag['id'] if tag else None
|
|
|
|
# ==================== Correspondents ====================
|
|
|
|
def get_correspondents(self, refresh: bool = False) -> Dict[int, dict]:
|
|
"""
|
|
Holt alle Korrespondenten.
|
|
|
|
Returns:
|
|
Dictionary mit Correspondent-ID als Key
|
|
"""
|
|
if self._correspondents_cache is not None and not refresh:
|
|
return self._correspondents_cache
|
|
|
|
correspondents = {}
|
|
for corr in self._get_paginated(self.ENDPOINTS['correspondents']):
|
|
correspondents[corr['id']] = corr
|
|
|
|
self._correspondents_cache = correspondents
|
|
logger.info(f"Geladen: {len(correspondents)} Korrespondenten")
|
|
return correspondents
|
|
|
|
def get_correspondent_name(self, correspondent_id: int) -> str:
|
|
"""Holt den Namen eines Korrespondenten."""
|
|
correspondents = self.get_correspondents()
|
|
corr = correspondents.get(correspondent_id)
|
|
return corr['name'] if corr else f"Unbekannt ({correspondent_id})"
|
|
|
|
# ==================== Document Types ====================
|
|
|
|
def get_document_types(self, refresh: bool = False) -> Dict[int, dict]:
|
|
"""Holt alle Dokumenttypen."""
|
|
if self._document_types_cache is not None and not refresh:
|
|
return self._document_types_cache
|
|
|
|
doc_types = {}
|
|
for dt in self._get_paginated(self.ENDPOINTS['document_types']):
|
|
doc_types[dt['id']] = dt
|
|
|
|
self._document_types_cache = doc_types
|
|
return doc_types
|
|
|
|
# ==================== Documents ====================
|
|
|
|
def get_documents(
|
|
self,
|
|
tags: Optional[List[Union[int, str]]] = None,
|
|
correspondent: Optional[Union[int, str]] = None,
|
|
document_type: Optional[Union[int, str]] = None,
|
|
year: Optional[int] = None,
|
|
month: Optional[int] = None,
|
|
date_from: Optional[datetime] = None,
|
|
date_to: Optional[datetime] = None,
|
|
query: Optional[str] = None,
|
|
ordering: str = '-archive_date',
|
|
**extra_filters
|
|
) -> List[dict]:
|
|
"""
|
|
Holt Dokumente mit optionalen Filtern.
|
|
|
|
Args:
|
|
tags: Liste von Tag-IDs oder Namen
|
|
correspondent: Korrespondent-ID oder Name
|
|
document_type: Dokumenttyp-ID oder Name
|
|
year: Jahr (für archive_date)
|
|
month: Monat (1-12, nur zusammen mit year)
|
|
date_from: Startdatum
|
|
date_to: Enddatum
|
|
query: Volltextsuche
|
|
ordering: Sortierung
|
|
**extra_filters: Zusätzliche Filter für die API
|
|
|
|
Returns:
|
|
Liste von Dokumenten
|
|
"""
|
|
params = {'ordering': ordering}
|
|
|
|
# Tags verarbeiten
|
|
if tags:
|
|
tag_ids = []
|
|
for tag in tags:
|
|
if isinstance(tag, int):
|
|
tag_ids.append(tag)
|
|
else:
|
|
tag_id = self.get_tag_id(tag)
|
|
if tag_id:
|
|
tag_ids.append(tag_id)
|
|
else:
|
|
logger.warning(f"Tag nicht gefunden: {tag}")
|
|
|
|
if tag_ids:
|
|
params['tags__id__in'] = ','.join(str(t) for t in tag_ids)
|
|
|
|
# Korrespondent
|
|
if correspondent:
|
|
if isinstance(correspondent, str):
|
|
correspondents = self.get_correspondents()
|
|
for c in correspondents.values():
|
|
if c['name'].lower() == correspondent.lower():
|
|
params['correspondent__id'] = c['id']
|
|
break
|
|
else:
|
|
params['correspondent__id'] = correspondent
|
|
|
|
# Dokumenttyp
|
|
if document_type:
|
|
if isinstance(document_type, str):
|
|
doc_types = self.get_document_types()
|
|
for dt in doc_types.values():
|
|
if dt['name'].lower() == document_type.lower():
|
|
params['document_type__id'] = dt['id']
|
|
break
|
|
else:
|
|
params['document_type__id'] = document_type
|
|
|
|
# Datumsfilter
|
|
date_field = self.config.date_field
|
|
|
|
if year:
|
|
if month:
|
|
# Spezifischer Monat
|
|
if month == 12:
|
|
next_year = year + 1
|
|
next_month = 1
|
|
else:
|
|
next_year = year
|
|
next_month = month + 1
|
|
|
|
params[f'{date_field}__gte'] = f'{year}-{month:02d}-01'
|
|
params[f'{date_field}__lt'] = f'{next_year}-{next_month:02d}-01'
|
|
else:
|
|
# Ganzes Jahr
|
|
params[f'{date_field}__year'] = year
|
|
|
|
if date_from:
|
|
params[f'{date_field}__gte'] = date_from.strftime('%Y-%m-%d')
|
|
|
|
if date_to:
|
|
params[f'{date_field}__lte'] = date_to.strftime('%Y-%m-%d')
|
|
|
|
# Volltextsuche
|
|
if query:
|
|
params['query'] = query
|
|
|
|
# Extra-Filter
|
|
params.update(extra_filters)
|
|
|
|
# Alle Dokumente abrufen
|
|
documents = list(self._get_paginated(self.ENDPOINTS['documents'], params))
|
|
logger.info(f"Geladen: {len(documents)} Dokumente")
|
|
|
|
return documents
|
|
|
|
def get_document(self, document_id: int) -> dict:
|
|
"""
|
|
Holt ein einzelnes Dokument.
|
|
|
|
Args:
|
|
document_id: ID des Dokuments
|
|
|
|
Returns:
|
|
Dokument-Dictionary
|
|
"""
|
|
endpoint = f"{self.ENDPOINTS['documents']}{document_id}/"
|
|
return self._request('GET', endpoint)
|
|
|
|
def get_document_url(self, document_id: int) -> str:
|
|
"""Generiert die Web-URL für ein Dokument."""
|
|
return f"{self.base_url}/documents/{document_id}/details"
|
|
|
|
def get_document_download_url(self, document_id: int) -> str:
|
|
"""Generiert die Download-URL für ein Dokument."""
|
|
return f"{self.base_url}/api/documents/{document_id}/download/"
|
|
|
|
# ==================== Hilfsmethoden ====================
|
|
|
|
def resolve_all_metadata(self, documents: List[dict]) -> List[dict]:
|
|
"""
|
|
Erweitert Dokumente um aufgelöste Metadaten (Tag-Namen, Korrespondent-Namen, etc.).
|
|
|
|
Args:
|
|
documents: Liste von Dokumenten
|
|
|
|
Returns:
|
|
Erweiterte Dokumente
|
|
"""
|
|
tags = self.get_tags()
|
|
correspondents = self.get_correspondents()
|
|
doc_types = self.get_document_types()
|
|
custom_fields = self.get_custom_fields()
|
|
|
|
for doc in documents:
|
|
# Tag-Namen
|
|
doc['tag_names'] = [
|
|
tags.get(tid, {}).get('name', f'Unknown-{tid}')
|
|
for tid in doc.get('tags', [])
|
|
]
|
|
|
|
# Korrespondent-Name
|
|
corr_id = doc.get('correspondent')
|
|
doc['correspondent_name'] = (
|
|
correspondents.get(corr_id, {}).get('name', '')
|
|
if corr_id else ''
|
|
)
|
|
|
|
# Dokumenttyp-Name
|
|
dt_id = doc.get('document_type')
|
|
doc['document_type_name'] = (
|
|
doc_types.get(dt_id, {}).get('name', '')
|
|
if dt_id else ''
|
|
)
|
|
|
|
# Custom Fields aufbereiten
|
|
doc['custom_fields_resolved'] = {}
|
|
for cf in doc.get('custom_fields', []):
|
|
field_id = cf.get('field')
|
|
field_def = custom_fields.get(field_id, {})
|
|
field_name = field_def.get('name', f'field_{field_id}')
|
|
doc['custom_fields_resolved'][field_name] = {
|
|
'value': cf.get('value'),
|
|
'type': field_def.get('data_type', 'string'),
|
|
'field_id': field_id
|
|
}
|
|
|
|
# URL hinzufügen
|
|
doc['web_url'] = self.get_document_url(doc['id'])
|
|
|
|
return documents
|
|
|
|
def get_statistics(self) -> dict:
|
|
"""
|
|
Holt allgemeine Statistiken.
|
|
|
|
Returns:
|
|
Dictionary mit Statistiken
|
|
"""
|
|
return {
|
|
'total_documents': len(list(self._get_paginated(
|
|
self.ENDPOINTS['documents'],
|
|
params={'page_size': 1}
|
|
))),
|
|
'total_tags': len(self.get_tags()),
|
|
'total_correspondents': len(self.get_correspondents()),
|
|
'total_custom_fields': len(self.get_custom_fields()),
|
|
}
|