From d2dd837f26a55ce81a6c263e7ab10ab53eec9c91 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 7 Dec 2025 10:09:10 +0000 Subject: [PATCH] Add Paperless Finance Report Tool - Complete implementation A Python CLI tool for generating financial reports from Paperless-ngx: - Phase 1 (MVP): Config handling, Paperless API client with auth and pagination, custom fields extraction, tag-based summation, CLI output - Phase 2 (Grouping): Multiple grouping criteria (tag, correspondent, category, payment type, month, quarter, year), percentage distribution - Phase 3 (Reports): HTML reports with Chart.js diagrams (doughnut, bar, line charts), PDF export via WeasyPrint, JSON and CSV export - Phase 4 (Comfort): Automatic tag ID resolution, disk caching with diskcache, colorized logging, comprehensive error handling Features: - Flexible date filtering (year, month, date range) - Period comparison with change analysis - Swiss franc formatting (CHF with apostrophe separators) - Interactive HTML reports with sortable tables and document links - Multiple output formats (CLI, HTML, PDF, JSON, CSV) --- paperless-report/.gitignore | 41 ++ paperless-report/README.md | 198 ++++++ paperless-report/__init__.py | 24 + paperless-report/config.py | 269 ++++++++ paperless-report/config.yaml.example | 78 +++ paperless-report/extractor.py | 592 +++++++++++++++++ paperless-report/main.py | 489 ++++++++++++++ paperless-report/output/.gitkeep | 1 + paperless-report/paperless_client.py | 537 ++++++++++++++++ paperless-report/report_generator.py | 628 ++++++++++++++++++ paperless-report/requirements.txt | 31 + paperless-report/setup.py | 88 +++ paperless-report/templates/report.html | 848 +++++++++++++++++++++++++ 13 files changed, 3824 insertions(+) create mode 100644 paperless-report/.gitignore create mode 100644 paperless-report/README.md create mode 100644 paperless-report/__init__.py create mode 100644 paperless-report/config.py create mode 100644 paperless-report/config.yaml.example create mode 100644 paperless-report/extractor.py create mode 100644 paperless-report/main.py create mode 100644 paperless-report/output/.gitkeep create mode 100644 paperless-report/paperless_client.py create mode 100644 paperless-report/report_generator.py create mode 100644 paperless-report/requirements.txt create mode 100644 paperless-report/setup.py create mode 100644 paperless-report/templates/report.html diff --git a/paperless-report/.gitignore b/paperless-report/.gitignore new file mode 100644 index 0000000..3e01afa --- /dev/null +++ b/paperless-report/.gitignore @@ -0,0 +1,41 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Virtual environment +venv/ +env/ +.venv/ + +# Config with secrets +config.yaml + +# Cache +.cache/ + +# Output files +output/*.html +output/*.pdf +output/*.json +output/*.csv + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Distribution / packaging +dist/ +build/ +*.egg-info/ + +# Testing +.pytest_cache/ +.coverage +htmlcov/ + +# Misc +.DS_Store +*.log diff --git a/paperless-report/README.md b/paperless-report/README.md new file mode 100644 index 0000000..8680b05 --- /dev/null +++ b/paperless-report/README.md @@ -0,0 +1,198 @@ +# Paperless Finance Report + +Ein Python-basiertes CLI-Tool, das über die Paperless-ngx REST-API Dokumente abruft, Beträge und Custom Fields extrahiert und daraus Finanzberichte generiert. + +## Features + +- **Basis-Auswertung**: Summe aller Beträge nach Tags, Kategorien, Korrespondenten +- **Zeiträume**: Filter nach Jahr, Monat oder beliebigem Datumsbereich +- **Gruppierung**: Nach Tag, Kategorie, Korrespondent, Zahlungsart, Monat, Quartal +- **Vergleichsberichte**: Jahresvergleiche mit Veränderungsanalyse +- **Mehrere Ausgabeformate**: CLI, HTML (mit Chart.js Diagrammen), PDF, JSON, CSV +- **Caching**: Optionaler Festplatten-Cache für bessere Performance +- **Flexibel**: Konfigurierbare Custom Field Namen + +## Installation + +### Voraussetzungen + +- Python 3.8+ +- Paperless-ngx Installation mit REST-API Zugriff +- API-Token (erstellen unter: Paperless → Einstellungen → Authentifizierungs-Tokens) + +### Installation + +```bash +# Repository klonen +git clone https://github.com/yourusername/paperless-report.git +cd paperless-report + +# Virtuelle Umgebung erstellen +python3 -m venv venv +source venv/bin/activate # Linux/macOS +# oder: venv\Scripts\activate # Windows + +# Dependencies installieren +pip install -r requirements.txt + +# Optional: Vollinstallation mit PDF-Support +pip install -e ".[full]" +``` + +### Konfiguration + +```bash +# Beispiel-Konfiguration erstellen +cp config.yaml.example config.yaml + +# Konfiguration anpassen +nano config.yaml +``` + +Mindestens erforderlich: +```yaml +paperless: + url: "http://localhost:8000" # Deine Paperless URL + token: "YOUR_API_TOKEN" # API Token +``` + +Alternativ kann der Token auch als Umgebungsvariable gesetzt werden: +```bash +export PAPERLESS_TOKEN="your_api_token" +``` + +## Verwendung + +### Verbindung testen + +```bash +python main.py test +``` + +### Jahresbericht + +```bash +# CLI-Ausgabe +python main.py report --year 2024 + +# Mit Details +python main.py report --year 2024 --detail + +# HTML-Bericht +python main.py report --year 2024 --format html + +# PDF-Bericht +python main.py report --year 2024 --format pdf +``` + +### Mit Filtern + +```bash +# Nach Tag filtern +python main.py report --year 2024 --tag rechnung + +# Nach Korrespondent filtern +python main.py report --year 2024 --correspondent "Swisscom" + +# Nach Monat filtern +python main.py report --year 2024 --month 6 +``` + +### Gruppierung + +```bash +# Nach Tag gruppieren (Standard) +python main.py report --year 2024 --group-by tag + +# Nach Korrespondent gruppieren +python main.py report --year 2024 --group-by correspondent + +# Nach Kategorie und Monat gruppieren +python main.py report --year 2024 --group-by category --group-by month +``` + +### Jahresvergleich + +```bash +# CLI-Vergleich +python main.py compare 2023 2024 + +# HTML-Vergleichsbericht +python main.py compare 2023 2024 --format html +``` + +### Weitere Befehle + +```bash +# Dokumente auflisten +python main.py list-docs --tag rechnung --limit 50 + +# Cache löschen +python main.py clear-cache + +# Hilfe anzeigen +python main.py --help +python main.py report --help +``` + +## Custom Fields in Paperless + +Für die volle Funktionalität sollten folgende Custom Fields in Paperless angelegt werden: + +| Feldname | Typ | Beschreibung | +|-----------------|----------|---------------------------------------| +| `betrag` | Währung | Rechnungsbetrag | +| `rechnungsdatum`| Datum | Datum der Rechnung | +| `kategorie` | Auswahl | Wohnen, Gesundheit, Mobilität, etc. | +| `zahlungsart` | Auswahl | Bar, Einzahlung, LSV, eBill | + +Die Feldnamen können in der `config.yaml` angepasst werden. + +## Ausgabeformate + +### CLI + +Einfache tabellarische Ausgabe im Terminal. + +### HTML + +Interaktiver Bericht mit: +- Zusammenfassungskarten +- Chart.js Diagramme (Doughnut, Bar, Line) +- Sortierbare Tabellen +- Links zu Paperless-Dokumenten +- Export-Button für CSV + +### PDF + +Druckfertiger PDF-Bericht (benötigt WeasyPrint). + +### JSON + +Maschinenlesbares Format für weitere Verarbeitung. + +### CSV + +Excel-kompatibles Format mit BOM für korrekte Umlaute. + +## Projektstruktur + +``` +paperless-report/ +├── config.yaml.example # Beispiel-Konfiguration +├── config.py # Konfigurationsmanagement +├── paperless_client.py # API-Client +├── extractor.py # Datenextraktion und -aggregation +├── report_generator.py # Berichtsgenerierung +├── main.py # CLI-Einstiegspunkt +├── templates/ +│ └── report.html # HTML-Template +├── output/ # Generierte Berichte +├── requirements.txt +├── setup.py +└── README.md +``` + +## Lizenz + +MIT License diff --git a/paperless-report/__init__.py b/paperless-report/__init__.py new file mode 100644 index 0000000..a96d9f1 --- /dev/null +++ b/paperless-report/__init__.py @@ -0,0 +1,24 @@ +""" +Paperless Finance Report Tool + +Generiert Finanzberichte aus Paperless-ngx Dokumenten. +""" + +__version__ = '1.0.0' +__author__ = 'Your Name' + +from config import Config, get_config +from paperless_client import PaperlessClient, PaperlessAPIError +from extractor import DocumentExtractor, DataAggregator, FinanceDocument +from report_generator import ReportGenerator + +__all__ = [ + 'Config', + 'get_config', + 'PaperlessClient', + 'PaperlessAPIError', + 'DocumentExtractor', + 'DataAggregator', + 'FinanceDocument', + 'ReportGenerator', +] diff --git a/paperless-report/config.py b/paperless-report/config.py new file mode 100644 index 0000000..e3d266c --- /dev/null +++ b/paperless-report/config.py @@ -0,0 +1,269 @@ +""" +Konfigurationsmanagement für das Paperless Finance Report Tool. + +Lädt und validiert die YAML-Konfiguration. +""" + +import os +import sys +from pathlib import Path +from typing import Any, Optional + +import yaml + + +class ConfigError(Exception): + """Fehler bei der Konfiguration.""" + pass + + +class Config: + """Konfigurationsklasse für das Paperless Finance Report Tool.""" + + DEFAULT_CONFIG = { + 'paperless': { + 'url': 'http://localhost:8000', + 'token': '', + 'timeout': 30, + }, + 'custom_fields': { + 'betrag': 'betrag', + 'rechnungsdatum': 'rechnungsdatum', + 'kategorie': 'kategorie', + 'zahlungsart': 'zahlungsart', + 'periode': 'periode', + 'notiz': 'notiz', + }, + 'defaults': { + 'currency': 'CHF', + 'date_field': 'archive_date', + 'invoice_tag': 'rechnung', + }, + 'tags': ['rechnung'], + 'categories': [], + 'output': { + 'format': 'html', + 'path': './output', + 'filename_pattern': 'finanzbericht_{year}', + }, + 'cache': { + 'enabled': True, + 'path': './.cache', + 'ttl': 3600, + }, + 'logging': { + 'level': 'INFO', + 'file': '', + 'colorize': True, + }, + } + + def __init__(self, config_path: Optional[str] = None): + """ + Initialisiert die Konfiguration. + + Args: + config_path: Pfad zur config.yaml. Falls None, wird im aktuellen + Verzeichnis und im Script-Verzeichnis gesucht. + """ + self._config = self.DEFAULT_CONFIG.copy() + self._config_path = self._find_config(config_path) + + if self._config_path: + self._load_config() + + self._validate_config() + + def _find_config(self, config_path: Optional[str]) -> Optional[Path]: + """Sucht nach der Konfigurationsdatei.""" + if config_path: + path = Path(config_path) + if path.exists(): + return path + raise ConfigError(f"Konfigurationsdatei nicht gefunden: {config_path}") + + # Suchpfade + search_paths = [ + Path.cwd() / 'config.yaml', + Path.cwd() / 'config.yml', + Path(__file__).parent / 'config.yaml', + Path(__file__).parent / 'config.yml', + Path.home() / '.config' / 'paperless-report' / 'config.yaml', + ] + + # Umgebungsvariable prüfen + env_path = os.environ.get('PAPERLESS_REPORT_CONFIG') + if env_path: + search_paths.insert(0, Path(env_path)) + + for path in search_paths: + if path.exists(): + return path + + return None + + def _load_config(self) -> None: + """Lädt die Konfiguration aus der YAML-Datei.""" + try: + with open(self._config_path, 'r', encoding='utf-8') as f: + user_config = yaml.safe_load(f) or {} + + # Rekursives Merge der Konfiguration + self._config = self._deep_merge(self._config, user_config) + + except yaml.YAMLError as e: + raise ConfigError(f"Fehler beim Parsen der Konfiguration: {e}") + except IOError as e: + raise ConfigError(f"Fehler beim Lesen der Konfiguration: {e}") + + def _deep_merge(self, base: dict, override: dict) -> dict: + """Führt zwei Dictionaries rekursiv zusammen.""" + result = base.copy() + + for key, value in override.items(): + if key in result and isinstance(result[key], dict) and isinstance(value, dict): + result[key] = self._deep_merge(result[key], value) + else: + result[key] = value + + return result + + def _validate_config(self) -> None: + """Validiert die Konfiguration.""" + # Paperless URL prüfen + url = self.get('paperless.url', '') + if not url: + raise ConfigError("Paperless URL muss konfiguriert werden") + + # Token prüfen (kann auch über Umgebungsvariable kommen) + token = self.get('paperless.token', '') or os.environ.get('PAPERLESS_TOKEN', '') + if not token: + raise ConfigError( + "Paperless API-Token muss konfiguriert werden.\n" + "Setze 'paperless.token' in config.yaml oder die Umgebungsvariable PAPERLESS_TOKEN" + ) + + # Token aus Umgebungsvariable übernehmen falls nicht in Config + if not self.get('paperless.token'): + self._config['paperless']['token'] = token + + def get(self, key: str, default: Any = None) -> Any: + """ + Holt einen Konfigurationswert über Punkt-Notation. + + Args: + key: Schlüssel in Punkt-Notation, z.B. 'paperless.url' + default: Standardwert falls Schlüssel nicht existiert + + Returns: + Der Konfigurationswert oder der Standardwert + """ + keys = key.split('.') + value = self._config + + try: + for k in keys: + value = value[k] + return value + except (KeyError, TypeError): + return default + + def __getitem__(self, key: str) -> Any: + """Ermöglicht Zugriff via config['key'].""" + value = self.get(key) + if value is None: + raise KeyError(key) + return value + + @property + def paperless_url(self) -> str: + """Paperless Base-URL.""" + url = self.get('paperless.url', '') + return url.rstrip('/') + + @property + def paperless_token(self) -> str: + """Paperless API-Token.""" + return self.get('paperless.token', '') + + @property + def timeout(self) -> int: + """Request-Timeout in Sekunden.""" + return self.get('paperless.timeout', 30) + + @property + def currency(self) -> str: + """Standardwährung.""" + return self.get('defaults.currency', 'CHF') + + @property + def date_field(self) -> str: + """Datumsfeld für Filterung.""" + return self.get('defaults.date_field', 'archive_date') + + @property + def output_format(self) -> str: + """Standard-Ausgabeformat.""" + return self.get('output.format', 'html') + + @property + def output_path(self) -> Path: + """Ausgabeverzeichnis.""" + return Path(self.get('output.path', './output')) + + @property + def cache_enabled(self) -> bool: + """Cache aktiviert.""" + return self.get('cache.enabled', True) + + @property + def cache_path(self) -> Path: + """Cache-Verzeichnis.""" + return Path(self.get('cache.path', './.cache')) + + @property + def cache_ttl(self) -> int: + """Cache-Gültigkeit in Sekunden.""" + return self.get('cache.ttl', 3600) + + @property + def log_level(self) -> str: + """Log-Level.""" + return self.get('logging.level', 'INFO') + + @property + def custom_field_names(self) -> dict: + """Mapping der Custom Field Namen.""" + return self.get('custom_fields', {}) + + def get_custom_field_name(self, internal_name: str) -> str: + """Holt den Paperless-Feldnamen für ein internes Feld.""" + return self.get(f'custom_fields.{internal_name}', internal_name) + + +# Globale Config-Instanz (lazy loading) +_config: Optional[Config] = None + + +def get_config(config_path: Optional[str] = None) -> Config: + """ + Holt die globale Konfiguration. + + Args: + config_path: Optionaler Pfad zur Konfigurationsdatei + + Returns: + Config-Instanz + """ + global _config + + if _config is None or config_path is not None: + _config = Config(config_path) + + return _config + + +def reset_config() -> None: + """Setzt die globale Konfiguration zurück (für Tests).""" + global _config + _config = None diff --git a/paperless-report/config.yaml.example b/paperless-report/config.yaml.example new file mode 100644 index 0000000..4e0187c --- /dev/null +++ b/paperless-report/config.yaml.example @@ -0,0 +1,78 @@ +# Paperless Finance Report - Konfiguration +# Kopiere diese Datei nach config.yaml und passe die Werte an + +paperless: + # URL deiner Paperless-ngx Installation + url: "http://localhost:8000" + # API-Token (erstellen unter: Einstellungen → Authentifizierungs-Tokens) + token: "YOUR_API_TOKEN_HERE" + # Timeout für API-Anfragen in Sekunden + timeout: 30 + +# Mapping der Custom Field Namen in Paperless +# Die Namen müssen exakt mit den in Paperless angelegten Feldern übereinstimmen +custom_fields: + betrag: "betrag" + rechnungsdatum: "rechnungsdatum" + kategorie: "kategorie" + zahlungsart: "zahlungsart" + periode: "periode" + notiz: "notiz" + +# Standardeinstellungen +defaults: + # Währung für Beträge + currency: "CHF" + # Welches Datumsfeld für Zeitraumfilter verwendet werden soll + # Optionen: "archive_date", "created", "added", oder ein Custom Field Name + date_field: "archive_date" + # Standard-Tag für Rechnungen (Name, nicht ID) + invoice_tag: "rechnung" + +# Tag-Namen die automatisch erkannt werden sollen +# Die IDs werden beim ersten Start automatisch ermittelt +tags: + - rechnung + - miete + - krankenkasse + - steuern + - versicherung + - nebenkosten + +# Kategorien für Gruppierung (müssen in Paperless als Auswahl-Optionen existieren) +categories: + - Wohnen + - Gesundheit + - Mobilität + - Versicherungen + - Steuern + - Lebensmittel + - Freizeit + - Diverses + +# Ausgabe-Einstellungen +output: + # Standard-Format: html, pdf, json, cli + format: "html" + # Verzeichnis für generierte Berichte + path: "./output" + # Dateiname-Muster (Platzhalter: {year}, {month}, {date}, {timestamp}) + filename_pattern: "finanzbericht_{year}" + +# Cache-Einstellungen +cache: + # Cache aktivieren + enabled: true + # Cache-Verzeichnis + path: "./.cache" + # Cache-Gültigkeit in Sekunden (Standard: 1 Stunde) + ttl: 3600 + +# Logging-Einstellungen +logging: + # Log-Level: DEBUG, INFO, WARNING, ERROR + level: "INFO" + # Log-Datei (leer = nur Konsole) + file: "" + # Farbige Ausgabe + colorize: true diff --git a/paperless-report/extractor.py b/paperless-report/extractor.py new file mode 100644 index 0000000..9ef2024 --- /dev/null +++ b/paperless-report/extractor.py @@ -0,0 +1,592 @@ +""" +Daten-Extraktion und Aggregation für das Paperless Finance Report Tool. + +Extrahiert Custom Fields aus Dokumenten und aggregiert die Daten +für verschiedene Gruppierungen. +""" + +import logging +import re +from collections import defaultdict +from dataclasses import dataclass, field +from datetime import datetime +from decimal import Decimal, InvalidOperation +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +from dateutil.parser import parse as parse_date + +from config import Config, get_config +from paperless_client import PaperlessClient + +logger = logging.getLogger(__name__) + + +@dataclass +class FinanceDocument: + """Ein aufbereitetes Finanzdokument.""" + + id: int + title: str + archive_date: Optional[datetime] = None + created: Optional[datetime] = None + added: Optional[datetime] = None + + # Paperless Metadata + correspondent: Optional[str] = None + correspondent_id: Optional[int] = None + document_type: Optional[str] = None + tags: List[str] = field(default_factory=list) + tag_ids: List[int] = field(default_factory=list) + + # Custom Fields + betrag: Optional[Decimal] = None + rechnungsdatum: Optional[datetime] = None + kategorie: Optional[str] = None + zahlungsart: Optional[str] = None + periode: Optional[str] = None + notiz: Optional[str] = None + + # URLs + web_url: Optional[str] = None + + # Original-Daten + raw_data: Dict = field(default_factory=dict) + + @property + def effective_date(self) -> Optional[datetime]: + """Das effektive Datum (Rechnungsdatum oder Archivdatum).""" + return self.rechnungsdatum or self.archive_date + + @property + def year(self) -> Optional[int]: + """Jahr des effektiven Datums.""" + date = self.effective_date + return date.year if date else None + + @property + def month(self) -> Optional[int]: + """Monat des effektiven Datums.""" + date = self.effective_date + return date.month if date else None + + @property + def month_year(self) -> Optional[str]: + """Monat/Jahr als String (z.B. '2024-01').""" + date = self.effective_date + return date.strftime('%Y-%m') if date else None + + @property + def quarter(self) -> Optional[str]: + """Quartal als String (z.B. 'Q1 2024').""" + date = self.effective_date + if not date: + return None + q = (date.month - 1) // 3 + 1 + return f"Q{q} {date.year}" + + +class DocumentExtractor: + """Extrahiert und verarbeitet Dokumente aus Paperless.""" + + def __init__(self, client: PaperlessClient, config: Optional[Config] = None): + """ + Initialisiert den Extractor. + + Args: + client: Paperless API Client + config: Konfiguration + """ + self.client = client + self.config = config or get_config() + self._custom_fields_map: Dict[str, int] = {} + + def _build_custom_fields_map(self) -> None: + """Baut ein Mapping von Feldnamen zu IDs.""" + if self._custom_fields_map: + return + + fields = self.client.get_custom_fields() + for field_id, field_def in fields.items(): + name = field_def['name'].lower() + self._custom_fields_map[name] = field_id + + def _parse_decimal(self, value: Any) -> Optional[Decimal]: + """ + Parst einen Wert zu Decimal. + + Verarbeitet verschiedene Formate: + - 1234.56 + - 1234,56 + - 1'234.56 (Schweizer Format) + - CHF 1234.56 + """ + if value is None: + return None + + if isinstance(value, (int, float)): + return Decimal(str(value)) + + if isinstance(value, Decimal): + return value + + if not isinstance(value, str): + return None + + # String bereinigen + value = value.strip() + + # Währungssymbole entfernen + value = re.sub(r'^(CHF|EUR|USD|Fr\.?)\s*', '', value, flags=re.IGNORECASE) + value = re.sub(r'\s*(CHF|EUR|USD|Fr\.?)$', '', value, flags=re.IGNORECASE) + + # Tausender-Trennzeichen entfernen (Apostroph, Punkt als Tausender) + # Schweizer Format: 1'234.56 oder 1'234,56 + if "'" in value: + value = value.replace("'", "") + + # Deutsches/Schweizer Format mit Punkt als Tausender: 1.234,56 + if re.match(r'^\d{1,3}(\.\d{3})+,\d{2}$', value): + value = value.replace(".", "").replace(",", ".") + # Komma als Dezimaltrennzeichen ohne Tausender + elif "," in value and "." not in value: + value = value.replace(",", ".") + + try: + return Decimal(value) + except InvalidOperation: + logger.warning(f"Konnte Betrag nicht parsen: {value}") + return None + + def _parse_date(self, value: Any) -> Optional[datetime]: + """Parst einen Wert zu datetime.""" + if value is None: + return None + + if isinstance(value, datetime): + return value + + if not isinstance(value, str): + return None + + try: + return parse_date(value) + except (ValueError, TypeError): + logger.warning(f"Konnte Datum nicht parsen: {value}") + return None + + def _get_custom_field_value(self, doc: dict, field_name: str) -> Any: + """Holt den Wert eines Custom Fields aus einem Dokument.""" + # Aus resolved fields + resolved = doc.get('custom_fields_resolved', {}) + if field_name in resolved: + return resolved[field_name].get('value') + + # Aus rohen custom_fields + self._build_custom_fields_map() + field_name_lower = field_name.lower() + + for cf in doc.get('custom_fields', []): + field_id = cf.get('field') + # Prüfen ob ID zum gesuchten Feldnamen passt + for name, fid in self._custom_fields_map.items(): + if fid == field_id and name == field_name_lower: + return cf.get('value') + + return None + + def extract_document(self, raw_doc: dict) -> FinanceDocument: + """ + Extrahiert ein aufbereitetes FinanceDocument aus den Rohdaten. + + Args: + raw_doc: Rohes Dokument-Dictionary von der API + + Returns: + FinanceDocument-Instanz + """ + # Custom Field Namen aus Config + cf_names = self.config.custom_field_names + + # Basis-Daten + doc = FinanceDocument( + id=raw_doc['id'], + title=raw_doc.get('title', ''), + raw_data=raw_doc + ) + + # Datums-Felder + doc.archive_date = self._parse_date(raw_doc.get('archive_date')) + doc.created = self._parse_date(raw_doc.get('created')) + doc.added = self._parse_date(raw_doc.get('added')) + + # Korrespondent + doc.correspondent_id = raw_doc.get('correspondent') + doc.correspondent = raw_doc.get('correspondent_name', '') + + # Dokumenttyp + doc.document_type = raw_doc.get('document_type_name', '') + + # Tags + doc.tag_ids = raw_doc.get('tags', []) + doc.tags = raw_doc.get('tag_names', []) + + # URL + doc.web_url = raw_doc.get('web_url', '') + + # Custom Fields + betrag_name = cf_names.get('betrag', 'betrag') + doc.betrag = self._parse_decimal( + self._get_custom_field_value(raw_doc, betrag_name) + ) + + datum_name = cf_names.get('rechnungsdatum', 'rechnungsdatum') + doc.rechnungsdatum = self._parse_date( + self._get_custom_field_value(raw_doc, datum_name) + ) + + kat_name = cf_names.get('kategorie', 'kategorie') + doc.kategorie = self._get_custom_field_value(raw_doc, kat_name) + + zahl_name = cf_names.get('zahlungsart', 'zahlungsart') + doc.zahlungsart = self._get_custom_field_value(raw_doc, zahl_name) + + periode_name = cf_names.get('periode', 'periode') + doc.periode = self._get_custom_field_value(raw_doc, periode_name) + + notiz_name = cf_names.get('notiz', 'notiz') + doc.notiz = self._get_custom_field_value(raw_doc, notiz_name) + + return doc + + def extract_documents(self, raw_docs: List[dict]) -> List[FinanceDocument]: + """ + Extrahiert mehrere Dokumente. + + Args: + raw_docs: Liste von Roh-Dokumenten + + Returns: + Liste von FinanceDocument-Instanzen + """ + # Metadaten auflösen + resolved = self.client.resolve_all_metadata(raw_docs) + + return [self.extract_document(doc) for doc in resolved] + + +@dataclass +class AggregationResult: + """Ergebnis einer Aggregation.""" + + # Basis-Statistiken + total_amount: Decimal = Decimal('0') + document_count: int = 0 + documents_with_amount: int = 0 + documents_without_amount: int = 0 + + # Dokumente + documents: List[FinanceDocument] = field(default_factory=list) + + # Gruppierte Daten + by_tag: Dict[str, 'GroupStats'] = field(default_factory=dict) + by_correspondent: Dict[str, 'GroupStats'] = field(default_factory=dict) + by_category: Dict[str, 'GroupStats'] = field(default_factory=dict) + by_payment_type: Dict[str, 'GroupStats'] = field(default_factory=dict) + by_month: Dict[str, 'GroupStats'] = field(default_factory=dict) + by_quarter: Dict[str, 'GroupStats'] = field(default_factory=dict) + by_year: Dict[int, 'GroupStats'] = field(default_factory=dict) + + # Zusätzliche Statistiken + average_amount: Decimal = Decimal('0') + median_amount: Decimal = Decimal('0') + min_amount: Decimal = Decimal('0') + max_amount: Decimal = Decimal('0') + top_items: List[FinanceDocument] = field(default_factory=list) + + @property + def total_formatted(self) -> str: + """Formatierte Gesamtsumme.""" + return f"{self.total_amount:,.2f}".replace(',', "'") + + +@dataclass +class GroupStats: + """Statistiken für eine Gruppe.""" + + name: str + amount: Decimal = Decimal('0') + count: int = 0 + percentage: float = 0.0 + documents: List[FinanceDocument] = field(default_factory=list) + + @property + def amount_formatted(self) -> str: + """Formatierter Betrag.""" + return f"{self.amount:,.2f}".replace(',', "'") + + +class DataAggregator: + """Aggregiert Finanzdokumente nach verschiedenen Kriterien.""" + + def __init__(self, config: Optional[Config] = None): + """ + Initialisiert den Aggregator. + + Args: + config: Konfiguration + """ + self.config = config or get_config() + + def aggregate( + self, + documents: List[FinanceDocument], + group_by: Optional[List[str]] = None + ) -> AggregationResult: + """ + Aggregiert Dokumente. + + Args: + documents: Liste von Dokumenten + group_by: Liste von Gruppierungskriterien: + 'tag', 'correspondent', 'category', 'payment_type', + 'month', 'quarter', 'year' + + Returns: + AggregationResult mit allen Statistiken + """ + result = AggregationResult() + result.documents = documents + result.document_count = len(documents) + + # Beträge sammeln + amounts: List[Decimal] = [] + + for doc in documents: + if doc.betrag is not None: + result.total_amount += doc.betrag + result.documents_with_amount += 1 + amounts.append(doc.betrag) + else: + result.documents_without_amount += 1 + + # Basis-Statistiken + if amounts: + amounts_sorted = sorted(amounts) + result.min_amount = amounts_sorted[0] + result.max_amount = amounts_sorted[-1] + result.average_amount = result.total_amount / len(amounts) + + # Median + mid = len(amounts_sorted) // 2 + if len(amounts_sorted) % 2 == 0: + result.median_amount = (amounts_sorted[mid - 1] + amounts_sorted[mid]) / 2 + else: + result.median_amount = amounts_sorted[mid] + + # Top-Posten + docs_with_amount = [d for d in documents if d.betrag is not None] + result.top_items = sorted( + docs_with_amount, + key=lambda d: d.betrag or Decimal('0'), + reverse=True + )[:10] + + # Gruppierungen + group_by = group_by or ['tag', 'correspondent', 'category', 'month'] + + if 'tag' in group_by: + result.by_tag = self._group_by_tags(documents, result.total_amount) + + if 'correspondent' in group_by: + result.by_correspondent = self._group_by_field( + documents, 'correspondent', result.total_amount + ) + + if 'category' in group_by: + result.by_category = self._group_by_field( + documents, 'kategorie', result.total_amount + ) + + if 'payment_type' in group_by: + result.by_payment_type = self._group_by_field( + documents, 'zahlungsart', result.total_amount + ) + + if 'month' in group_by: + result.by_month = self._group_by_field( + documents, 'month_year', result.total_amount + ) + + if 'quarter' in group_by: + result.by_quarter = self._group_by_field( + documents, 'quarter', result.total_amount + ) + + if 'year' in group_by: + result.by_year = self._group_by_field( + documents, 'year', result.total_amount + ) + + return result + + def _group_by_tags( + self, + documents: List[FinanceDocument], + total: Decimal + ) -> Dict[str, GroupStats]: + """Gruppiert nach Tags (ein Dokument kann mehrere Tags haben).""" + groups: Dict[str, GroupStats] = {} + + for doc in documents: + if not doc.tags: + tag_name = 'Ohne Tag' + if tag_name not in groups: + groups[tag_name] = GroupStats(name=tag_name) + groups[tag_name].count += 1 + if doc.betrag: + groups[tag_name].amount += doc.betrag + groups[tag_name].documents.append(doc) + else: + for tag in doc.tags: + if tag not in groups: + groups[tag] = GroupStats(name=tag) + groups[tag].count += 1 + if doc.betrag: + groups[tag].amount += doc.betrag + groups[tag].documents.append(doc) + + # Prozente berechnen + if total > 0: + for stats in groups.values(): + stats.percentage = float(stats.amount / total * 100) + + # Nach Betrag sortieren + return dict(sorted( + groups.items(), + key=lambda x: x[1].amount, + reverse=True + )) + + def _group_by_field( + self, + documents: List[FinanceDocument], + field: str, + total: Decimal + ) -> Dict[str, GroupStats]: + """Gruppiert nach einem einzelnen Feld.""" + groups: Dict[str, GroupStats] = {} + + for doc in documents: + value = getattr(doc, field, None) + + if value is None or value == '': + key = 'Nicht zugeordnet' + else: + key = str(value) + + if key not in groups: + groups[key] = GroupStats(name=key) + + groups[key].count += 1 + if doc.betrag: + groups[key].amount += doc.betrag + groups[key].documents.append(doc) + + # Prozente berechnen + if total > 0: + for stats in groups.values(): + stats.percentage = float(stats.amount / total * 100) + + # Nach Betrag sortieren (bei Monaten chronologisch) + if field in ('month_year', 'quarter'): + return dict(sorted(groups.items())) + else: + return dict(sorted( + groups.items(), + key=lambda x: x[1].amount, + reverse=True + )) + + def compare_periods( + self, + documents: List[FinanceDocument], + period1: Union[int, str], + period2: Union[int, str], + period_type: str = 'year' + ) -> Dict[str, Any]: + """ + Vergleicht zwei Zeiträume. + + Args: + documents: Alle Dokumente + period1: Erste Periode (z.B. 2023) + period2: Zweite Periode (z.B. 2024) + period_type: 'year', 'quarter', 'month' + + Returns: + Vergleichsergebnis + """ + # Dokumente nach Periode filtern + def get_period(doc: FinanceDocument) -> Optional[Union[int, str]]: + if period_type == 'year': + return doc.year + elif period_type == 'quarter': + return doc.quarter + elif period_type == 'month': + return doc.month_year + return None + + docs1 = [d for d in documents if get_period(d) == period1] + docs2 = [d for d in documents if get_period(d) == period2] + + agg1 = self.aggregate(docs1, ['tag', 'category']) + agg2 = self.aggregate(docs2, ['tag', 'category']) + + # Differenzen berechnen + diff_absolute = agg2.total_amount - agg1.total_amount + diff_percent = ( + float(diff_absolute / agg1.total_amount * 100) + if agg1.total_amount > 0 else 0 + ) + + # Kategorien vergleichen + category_comparison = {} + all_categories = set(agg1.by_category.keys()) | set(agg2.by_category.keys()) + + for cat in all_categories: + stats1 = agg1.by_category.get(cat, GroupStats(name=cat)) + stats2 = agg2.by_category.get(cat, GroupStats(name=cat)) + + diff = stats2.amount - stats1.amount + pct_change = ( + float(diff / stats1.amount * 100) + if stats1.amount > 0 else (100.0 if stats2.amount > 0 else 0) + ) + + category_comparison[cat] = { + 'period1': stats1.amount, + 'period2': stats2.amount, + 'diff_absolute': diff, + 'diff_percent': pct_change, + 'status': 'new' if stats1.amount == 0 else ( + 'removed' if stats2.amount == 0 else 'changed' + ) + } + + return { + 'period1': { + 'name': str(period1), + 'total': agg1.total_amount, + 'count': agg1.document_count, + 'aggregation': agg1, + }, + 'period2': { + 'name': str(period2), + 'total': agg2.total_amount, + 'count': agg2.document_count, + 'aggregation': agg2, + }, + 'diff_absolute': diff_absolute, + 'diff_percent': diff_percent, + 'category_comparison': category_comparison, + } diff --git a/paperless-report/main.py b/paperless-report/main.py new file mode 100644 index 0000000..241bd19 --- /dev/null +++ b/paperless-report/main.py @@ -0,0 +1,489 @@ +#!/usr/bin/env python3 +""" +Paperless Finance Report Tool + +CLI-Einstiegspunkt für das Paperless Finanz-Auswertungstool. +Generiert Finanzberichte aus Paperless-ngx Dokumenten. +""" + +import logging +import sys +from datetime import datetime +from pathlib import Path +from typing import List, Optional + +import click +from tabulate import tabulate + +# Lokale Imports +from config import Config, ConfigError, get_config, reset_config +from extractor import DataAggregator, DocumentExtractor +from paperless_client import PaperlessAPIError, PaperlessClient +from report_generator import ReportGenerator + +# Logger einrichten +logger = logging.getLogger('paperless_report') + + +def setup_logging(level: str = 'INFO', colorize: bool = True) -> None: + """Richtet das Logging ein.""" + log_level = getattr(logging, level.upper(), logging.INFO) + + if colorize: + try: + import colorlog + handler = colorlog.StreamHandler() + handler.setFormatter(colorlog.ColoredFormatter( + '%(log_color)s%(levelname)-8s%(reset)s %(message)s', + log_colors={ + 'DEBUG': 'cyan', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'red,bg_white', + } + )) + except ImportError: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('%(levelname)-8s %(message)s')) + else: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('%(levelname)-8s %(message)s')) + + logger.addHandler(handler) + logger.setLevel(log_level) + + # Auch für andere Module + logging.getLogger('paperless_report').setLevel(log_level) + + +def get_cache(config: Config): + """Erstellt den Cache falls aktiviert.""" + if not config.cache_enabled: + return None + + try: + from diskcache import Cache + cache_path = config.cache_path + cache_path.mkdir(parents=True, exist_ok=True) + return Cache(str(cache_path)) + except ImportError: + logger.warning("diskcache nicht installiert, Cache deaktiviert") + return None + + +# CLI-Gruppe +@click.group() +@click.option('--config', '-c', 'config_path', type=click.Path(exists=True), + help='Pfad zur Konfigurationsdatei') +@click.option('--verbose', '-v', is_flag=True, help='Ausführliche Ausgabe') +@click.option('--quiet', '-q', is_flag=True, help='Nur Fehler ausgeben') +@click.pass_context +def cli(ctx, config_path: Optional[str], verbose: bool, quiet: bool): + """ + Paperless Finance Report Tool + + Generiert Finanzberichte aus Paperless-ngx Dokumenten. + + Beispiele: + + # Jahresbericht 2024 + paperless-report report --year 2024 + + # Mit Tag-Filter + paperless-report report --year 2024 --tag rechnung + + # Jahresvergleich + paperless-report compare 2023 2024 + + # Verbindung testen + paperless-report test + """ + ctx.ensure_object(dict) + + # Log-Level bestimmen + if quiet: + log_level = 'ERROR' + elif verbose: + log_level = 'DEBUG' + else: + log_level = 'INFO' + + setup_logging(log_level) + + # Config laden + try: + reset_config() + config = get_config(config_path) + ctx.obj['config'] = config + except ConfigError as e: + click.echo(f"Konfigurationsfehler: {e}", err=True) + sys.exit(1) + + +@cli.command() +@click.pass_context +def test(ctx): + """Testet die Verbindung zur Paperless-API.""" + config = ctx.obj['config'] + + click.echo(f"Teste Verbindung zu {config.paperless_url}...") + + try: + cache = get_cache(config) + client = PaperlessClient(config, cache) + + if client.test_connection(): + click.echo(click.style("Verbindung erfolgreich!", fg='green')) + + # Statistiken anzeigen + click.echo("\nStatistiken:") + tags = client.get_tags() + correspondents = client.get_correspondents() + custom_fields = client.get_custom_fields() + + click.echo(f" Tags: {len(tags)}") + click.echo(f" Korrespondenten: {len(correspondents)}") + click.echo(f" Custom Fields: {len(custom_fields)}") + + # Custom Fields auflisten + if custom_fields: + click.echo("\nCustom Fields:") + for field_id, field in custom_fields.items(): + click.echo(f" - {field['name']} (Typ: {field.get('data_type', 'unknown')})") + + else: + click.echo(click.style("Verbindung fehlgeschlagen!", fg='red')) + sys.exit(1) + + except PaperlessAPIError as e: + click.echo(click.style(f"API-Fehler: {e}", fg='red'), err=True) + sys.exit(1) + + +@cli.command() +@click.option('--year', '-y', type=int, help='Jahr für den Bericht') +@click.option('--month', '-m', type=int, help='Monat (1-12)') +@click.option('--tag', '-t', 'tags', multiple=True, help='Nach Tag filtern (mehrfach möglich)') +@click.option('--correspondent', help='Nach Korrespondent filtern') +@click.option('--group-by', '-g', 'group_by', + type=click.Choice(['tag', 'correspondent', 'category', 'payment_type', 'month', 'quarter', 'year']), + multiple=True, default=['tag', 'category', 'month'], + help='Gruppierung (mehrfach möglich)') +@click.option('--format', '-f', 'output_format', + type=click.Choice(['cli', 'html', 'pdf', 'json', 'csv']), + default='cli', help='Ausgabeformat') +@click.option('--output', '-o', 'output_file', type=click.Path(), + help='Ausgabedatei (optional)') +@click.option('--detail', '-d', is_flag=True, help='Detaillierte Ausgabe') +@click.option('--no-cache', is_flag=True, help='Cache ignorieren') +@click.pass_context +def report(ctx, year: Optional[int], month: Optional[int], tags: tuple, + correspondent: Optional[str], group_by: tuple, output_format: str, + output_file: Optional[str], detail: bool, no_cache: bool): + """ + Generiert einen Finanzbericht. + + Beispiele: + + # Jahresbericht 2024 als CLI + paperless-report report --year 2024 + + # HTML-Bericht mit Tag-Filter + paperless-report report --year 2024 --tag rechnung --format html + + # Detaillierter Bericht nach Korrespondent gruppiert + paperless-report report --year 2024 --group-by correspondent --detail + + # PDF für einen bestimmten Monat + paperless-report report --year 2024 --month 6 --format pdf + """ + config = ctx.obj['config'] + + # Standard: aktuelles Jahr + if not year: + year = datetime.now().year + click.echo(f"Kein Jahr angegeben, verwende {year}") + + try: + cache = None if no_cache else get_cache(config) + client = PaperlessClient(config, cache) + extractor = DocumentExtractor(client, config) + aggregator = DataAggregator(config) + generator = ReportGenerator(config) + + # Dokumente abrufen + click.echo(f"Lade Dokumente für {year}" + (f"/{month}" if month else "") + "...") + + with click.progressbar(length=1, label='API-Abfrage') as bar: + raw_docs = client.get_documents( + tags=list(tags) if tags else None, + correspondent=correspondent, + year=year, + month=month, + ) + bar.update(1) + + if not raw_docs: + click.echo(click.style("Keine Dokumente gefunden.", fg='yellow')) + return + + click.echo(f"Gefunden: {len(raw_docs)} Dokumente") + + # Dokumente extrahieren + click.echo("Extrahiere Daten...") + documents = extractor.extract_documents(raw_docs) + + # Aggregieren + click.echo("Aggregiere Daten...") + result = aggregator.aggregate(documents, list(group_by)) + + # Titel generieren + if month: + title = f"Paperless Finanzbericht {month:02d}/{year}" + else: + title = f"Paperless Finanzbericht {year}" + + # Ausgabe + if output_format == 'cli': + output = generator.generate_cli(result, title, detail) + click.echo() + click.echo(output) + + elif output_format == 'html': + if output_file: + path = Path(output_file) + else: + path = generator.save_html(result, title, year, month) + click.echo(click.style(f"HTML-Bericht gespeichert: {path}", fg='green')) + + # Bericht öffnen? + if click.confirm("Bericht im Browser öffnen?", default=True): + import webbrowser + webbrowser.open(f"file://{path.absolute()}") + + elif output_format == 'pdf': + if output_file: + path = Path(output_file) + pdf_bytes = generator.generate_pdf(result, title, year, month) + with open(path, 'wb') as f: + f.write(pdf_bytes) + else: + path = generator.save_pdf(result, title, year, month) + click.echo(click.style(f"PDF-Bericht gespeichert: {path}", fg='green')) + + elif output_format == 'json': + if output_file: + path = Path(output_file) + json_str = generator.generate_json(result) + with open(path, 'w', encoding='utf-8') as f: + f.write(json_str) + else: + path = generator.save_json(result, year, month) + click.echo(click.style(f"JSON-Export gespeichert: {path}", fg='green')) + + elif output_format == 'csv': + if output_file: + path = Path(output_file) + csv_str = generator.generate_csv(documents) + with open(path, 'w', encoding='utf-8-sig') as f: + f.write(csv_str) + else: + path = generator.save_csv(documents, year, month) + click.echo(click.style(f"CSV-Export gespeichert: {path}", fg='green')) + + except PaperlessAPIError as e: + click.echo(click.style(f"API-Fehler: {e}", fg='red'), err=True) + sys.exit(1) + except Exception as e: + logger.exception("Unerwarteter Fehler") + click.echo(click.style(f"Fehler: {e}", fg='red'), err=True) + sys.exit(1) + + +@cli.command() +@click.argument('period1', type=int) +@click.argument('period2', type=int) +@click.option('--tag', '-t', 'tags', multiple=True, help='Nach Tag filtern') +@click.option('--format', '-f', 'output_format', + type=click.Choice(['cli', 'html']), default='cli', + help='Ausgabeformat') +@click.option('--output', '-o', 'output_file', type=click.Path(), + help='Ausgabedatei') +@click.pass_context +def compare(ctx, period1: int, period2: int, tags: tuple, + output_format: str, output_file: Optional[str]): + """ + Vergleicht zwei Zeiträume (Jahre). + + Beispiele: + + # Jahresvergleich 2023 vs 2024 + paperless-report compare 2023 2024 + + # Mit Tag-Filter + paperless-report compare 2023 2024 --tag rechnung + + # Als HTML + paperless-report compare 2023 2024 --format html + """ + config = ctx.obj['config'] + + try: + cache = get_cache(config) + client = PaperlessClient(config, cache) + extractor = DocumentExtractor(client, config) + aggregator = DataAggregator(config) + generator = ReportGenerator(config) + + # Dokumente für beide Perioden laden + click.echo(f"Lade Dokumente für {period1} und {period2}...") + + raw_docs_1 = client.get_documents( + tags=list(tags) if tags else None, + year=period1 + ) + raw_docs_2 = client.get_documents( + tags=list(tags) if tags else None, + year=period2 + ) + + click.echo(f"Gefunden: {len(raw_docs_1)} ({period1}) / {len(raw_docs_2)} ({period2})") + + # Dokumente zusammenführen und extrahieren + all_raw_docs = raw_docs_1 + raw_docs_2 + all_docs = extractor.extract_documents(all_raw_docs) + + # Vergleich + click.echo("Vergleiche Perioden...") + comparison = aggregator.compare_periods(all_docs, period1, period2) + + if output_format == 'cli': + output = generator.generate_comparison_cli(comparison) + click.echo() + click.echo(output) + + elif output_format == 'html': + # Aggregation für das neuere Jahr als Basis + docs_2 = [d for d in all_docs if d.year == period2] + result = aggregator.aggregate(docs_2, ['tag', 'category', 'month']) + + title = f"Vergleich {period1} vs {period2}" + + if output_file: + path = Path(output_file) + html = generator.generate_html(result, title, period2, comparison=comparison) + with open(path, 'w', encoding='utf-8') as f: + f.write(html) + else: + path = generator.save_html(result, title, period2, comparison=comparison) + + click.echo(click.style(f"Vergleichsbericht gespeichert: {path}", fg='green')) + + except PaperlessAPIError as e: + click.echo(click.style(f"API-Fehler: {e}", fg='red'), err=True) + sys.exit(1) + + +@cli.command() +@click.option('--tag', '-t', 'tags', multiple=True, help='Nach Tag filtern') +@click.option('--year', '-y', type=int, help='Jahr') +@click.option('--limit', '-l', type=int, default=20, help='Anzahl Dokumente') +@click.pass_context +def list_docs(ctx, tags: tuple, year: Optional[int], limit: int): + """ + Listet Dokumente auf. + + Beispiele: + + # Letzte 20 Dokumente + paperless-report list-docs + + # Mit Tag-Filter + paperless-report list-docs --tag rechnung --limit 50 + """ + config = ctx.obj['config'] + + try: + cache = get_cache(config) + client = PaperlessClient(config, cache) + extractor = DocumentExtractor(client, config) + + raw_docs = client.get_documents( + tags=list(tags) if tags else None, + year=year + ) + + if not raw_docs: + click.echo("Keine Dokumente gefunden.") + return + + documents = extractor.extract_documents(raw_docs[:limit]) + + # Tabelle erstellen + table_data = [] + for doc in documents: + table_data.append([ + doc.id, + (doc.effective_date.strftime('%d.%m.%Y') + if doc.effective_date else '-'), + doc.title[:40] + ('...' if len(doc.title) > 40 else ''), + doc.correspondent[:20] if doc.correspondent else '-', + (f"{config.currency} {doc.betrag:,.2f}".replace(',', "'") + if doc.betrag else '-'), + ]) + + headers = ['ID', 'Datum', 'Titel', 'Korrespondent', 'Betrag'] + click.echo(tabulate(table_data, headers=headers, tablefmt='simple')) + click.echo(f"\nGesamt: {len(raw_docs)} Dokumente (zeige {min(limit, len(raw_docs))})") + + except PaperlessAPIError as e: + click.echo(click.style(f"API-Fehler: {e}", fg='red'), err=True) + sys.exit(1) + + +@cli.command() +@click.pass_context +def clear_cache(ctx): + """Löscht den Cache.""" + config = ctx.obj['config'] + + cache_path = config.cache_path + if cache_path.exists(): + import shutil + shutil.rmtree(cache_path) + click.echo(click.style("Cache gelöscht.", fg='green')) + else: + click.echo("Kein Cache vorhanden.") + + +@cli.command() +@click.pass_context +def init(ctx): + """Erstellt eine Beispiel-Konfigurationsdatei.""" + config_file = Path.cwd() / 'config.yaml' + + if config_file.exists(): + if not click.confirm(f"{config_file} existiert bereits. Überschreiben?"): + return + + # Beispiel-Config kopieren + example_config = Path(__file__).parent / 'config.yaml.example' + if example_config.exists(): + import shutil + shutil.copy(example_config, config_file) + click.echo(click.style(f"Konfiguration erstellt: {config_file}", fg='green')) + click.echo("\nBitte bearbeite die Datei und setze:") + click.echo(" - paperless.url: URL deiner Paperless-Installation") + click.echo(" - paperless.token: API-Token") + else: + click.echo(click.style("Beispiel-Konfiguration nicht gefunden.", fg='red')) + + +def main(): + """Haupteinstiegspunkt.""" + cli(obj={}) + + +if __name__ == '__main__': + main() diff --git a/paperless-report/output/.gitkeep b/paperless-report/output/.gitkeep new file mode 100644 index 0000000..13c98d3 --- /dev/null +++ b/paperless-report/output/.gitkeep @@ -0,0 +1 @@ +# Dieses Verzeichnis enthält generierte Berichte diff --git a/paperless-report/paperless_client.py b/paperless-report/paperless_client.py new file mode 100644 index 0000000..12d06f3 --- /dev/null +++ b/paperless-report/paperless_client.py @@ -0,0 +1,537 @@ +""" +Paperless-ngx API Client. + +Handhabt die Kommunikation mit der Paperless REST-API inkl. Paginierung und Caching. +""" + +import hashlib +import json +import logging +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Generator, List, Optional, Union +from urllib.parse import urlencode, urljoin + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from config import Config, get_config + +logger = logging.getLogger(__name__) + + +class PaperlessAPIError(Exception): + """Fehler bei der API-Kommunikation.""" + + def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[dict] = None): + super().__init__(message) + self.status_code = status_code + self.response = response + + +class PaperlessClient: + """Client für die Paperless-ngx REST-API.""" + + # API-Endpunkte + ENDPOINTS = { + 'documents': '/api/documents/', + 'tags': '/api/tags/', + 'correspondents': '/api/correspondents/', + 'document_types': '/api/document_types/', + 'custom_fields': '/api/custom_fields/', + 'storage_paths': '/api/storage_paths/', + } + + def __init__(self, config: Optional[Config] = None, cache: Optional[Any] = None): + """ + Initialisiert den API-Client. + + Args: + config: Konfigurationsobjekt. Falls None, wird globale Config verwendet. + cache: Optionales Cache-Objekt (diskcache.Cache) + """ + self.config = config or get_config() + self.base_url = self.config.paperless_url + self.token = self.config.paperless_token + self.timeout = self.config.timeout + self.cache = cache + + # Session mit Retry-Logik erstellen + self.session = self._create_session() + + # Cached Metadata + self._custom_fields_cache: Optional[Dict[int, dict]] = None + self._tags_cache: Optional[Dict[int, dict]] = None + self._correspondents_cache: Optional[Dict[int, dict]] = None + self._document_types_cache: Optional[Dict[int, dict]] = None + + def _create_session(self) -> requests.Session: + """Erstellt eine Session mit Retry-Konfiguration.""" + session = requests.Session() + + # Retry-Strategie + retry_strategy = Retry( + total=3, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + ) + + adapter = HTTPAdapter(max_retries=retry_strategy) + session.mount('http://', adapter) + session.mount('https://', adapter) + + # Standard-Header + session.headers.update({ + 'Authorization': f'Token {self.token}', + 'Accept': 'application/json', + 'Content-Type': 'application/json', + }) + + return session + + def _get_cache_key(self, endpoint: str, params: Optional[dict] = None) -> str: + """Generiert einen Cache-Schlüssel.""" + key_data = f"{self.base_url}{endpoint}" + if params: + key_data += json.dumps(params, sort_keys=True) + return hashlib.md5(key_data.encode()).hexdigest() + + def _request( + self, + method: str, + endpoint: str, + params: Optional[dict] = None, + data: Optional[dict] = None, + use_cache: bool = True + ) -> dict: + """ + Führt einen API-Request durch. + + Args: + method: HTTP-Methode (GET, POST, etc.) + endpoint: API-Endpunkt (relativ zur Base-URL) + params: Query-Parameter + data: Request-Body + use_cache: Cache verwenden (nur für GET) + + Returns: + API-Response als Dictionary + """ + url = urljoin(self.base_url, endpoint) + + # Cache prüfen (nur GET-Requests) + if method.upper() == 'GET' and use_cache and self.cache: + cache_key = self._get_cache_key(endpoint, params) + cached = self.cache.get(cache_key) + if cached is not None: + logger.debug(f"Cache hit für {endpoint}") + return cached + + logger.debug(f"API Request: {method} {url} params={params}") + + try: + response = self.session.request( + method=method, + url=url, + params=params, + json=data, + timeout=self.timeout + ) + + response.raise_for_status() + result = response.json() + + # In Cache speichern (nur GET) + if method.upper() == 'GET' and use_cache and self.cache: + self.cache.set(cache_key, result, expire=self.config.cache_ttl) + + return result + + except requests.exceptions.HTTPError as e: + error_msg = f"HTTP-Fehler: {e}" + try: + error_detail = e.response.json() + error_msg = f"{error_msg} - {error_detail}" + except (ValueError, AttributeError): + pass + + raise PaperlessAPIError( + error_msg, + status_code=e.response.status_code if e.response else None + ) + + except requests.exceptions.ConnectionError as e: + raise PaperlessAPIError(f"Verbindungsfehler: Kann {self.base_url} nicht erreichen") + + except requests.exceptions.Timeout as e: + raise PaperlessAPIError(f"Timeout nach {self.timeout}s") + + except requests.exceptions.RequestException as e: + raise PaperlessAPIError(f"Request-Fehler: {e}") + + def _get_paginated( + self, + endpoint: str, + params: Optional[dict] = None, + page_size: int = 100 + ) -> Generator[dict, None, None]: + """ + Holt alle Seiten eines paginierten Endpunkts. + + Args: + endpoint: API-Endpunkt + params: Zusätzliche Query-Parameter + page_size: Anzahl Ergebnisse pro Seite + + Yields: + Einzelne Ergebnis-Objekte + """ + params = params or {} + params['page_size'] = page_size + page = 1 + + while True: + params['page'] = page + logger.debug(f"Lade Seite {page} von {endpoint}") + + response = self._request('GET', endpoint, params=params) + + results = response.get('results', []) + for item in results: + yield item + + # Prüfen ob weitere Seiten existieren + if not response.get('next'): + break + + page += 1 + + def test_connection(self) -> bool: + """ + Testet die Verbindung zur Paperless-API. + + Returns: + True wenn Verbindung erfolgreich + """ + try: + self._request('GET', self.ENDPOINTS['tags'], params={'page_size': 1}) + return True + except PaperlessAPIError: + return False + + # ==================== Custom Fields ==================== + + def get_custom_fields(self, refresh: bool = False) -> Dict[int, dict]: + """ + Holt alle Custom Field Definitionen. + + Args: + refresh: Cache ignorieren und neu laden + + Returns: + Dictionary mit Field-ID als Key und Definition als Value + """ + if self._custom_fields_cache is not None and not refresh: + return self._custom_fields_cache + + fields = {} + for field in self._get_paginated(self.ENDPOINTS['custom_fields']): + fields[field['id']] = field + + self._custom_fields_cache = fields + logger.info(f"Geladen: {len(fields)} Custom Fields") + return fields + + def get_custom_field_by_name(self, name: str) -> Optional[dict]: + """ + Findet ein Custom Field anhand des Namens. + + Args: + name: Name des Custom Fields + + Returns: + Field-Definition oder None + """ + fields = self.get_custom_fields() + for field in fields.values(): + if field['name'].lower() == name.lower(): + return field + return None + + # ==================== Tags ==================== + + def get_tags(self, refresh: bool = False) -> Dict[int, dict]: + """ + Holt alle Tags. + + Returns: + Dictionary mit Tag-ID als Key + """ + if self._tags_cache is not None and not refresh: + return self._tags_cache + + tags = {} + for tag in self._get_paginated(self.ENDPOINTS['tags']): + tags[tag['id']] = tag + + self._tags_cache = tags + logger.info(f"Geladen: {len(tags)} Tags") + return tags + + def get_tag_by_name(self, name: str) -> Optional[dict]: + """Findet einen Tag anhand des Namens.""" + tags = self.get_tags() + for tag in tags.values(): + if tag['name'].lower() == name.lower(): + return tag + return None + + def get_tag_id(self, name: str) -> Optional[int]: + """Holt die ID eines Tags anhand des Namens.""" + tag = self.get_tag_by_name(name) + return tag['id'] if tag else None + + # ==================== Correspondents ==================== + + def get_correspondents(self, refresh: bool = False) -> Dict[int, dict]: + """ + Holt alle Korrespondenten. + + Returns: + Dictionary mit Correspondent-ID als Key + """ + if self._correspondents_cache is not None and not refresh: + return self._correspondents_cache + + correspondents = {} + for corr in self._get_paginated(self.ENDPOINTS['correspondents']): + correspondents[corr['id']] = corr + + self._correspondents_cache = correspondents + logger.info(f"Geladen: {len(correspondents)} Korrespondenten") + return correspondents + + def get_correspondent_name(self, correspondent_id: int) -> str: + """Holt den Namen eines Korrespondenten.""" + correspondents = self.get_correspondents() + corr = correspondents.get(correspondent_id) + return corr['name'] if corr else f"Unbekannt ({correspondent_id})" + + # ==================== Document Types ==================== + + def get_document_types(self, refresh: bool = False) -> Dict[int, dict]: + """Holt alle Dokumenttypen.""" + if self._document_types_cache is not None and not refresh: + return self._document_types_cache + + doc_types = {} + for dt in self._get_paginated(self.ENDPOINTS['document_types']): + doc_types[dt['id']] = dt + + self._document_types_cache = doc_types + return doc_types + + # ==================== Documents ==================== + + def get_documents( + self, + tags: Optional[List[Union[int, str]]] = None, + correspondent: Optional[Union[int, str]] = None, + document_type: Optional[Union[int, str]] = None, + year: Optional[int] = None, + month: Optional[int] = None, + date_from: Optional[datetime] = None, + date_to: Optional[datetime] = None, + query: Optional[str] = None, + ordering: str = '-archive_date', + **extra_filters + ) -> List[dict]: + """ + Holt Dokumente mit optionalen Filtern. + + Args: + tags: Liste von Tag-IDs oder Namen + correspondent: Korrespondent-ID oder Name + document_type: Dokumenttyp-ID oder Name + year: Jahr (für archive_date) + month: Monat (1-12, nur zusammen mit year) + date_from: Startdatum + date_to: Enddatum + query: Volltextsuche + ordering: Sortierung + **extra_filters: Zusätzliche Filter für die API + + Returns: + Liste von Dokumenten + """ + params = {'ordering': ordering} + + # Tags verarbeiten + if tags: + tag_ids = [] + for tag in tags: + if isinstance(tag, int): + tag_ids.append(tag) + else: + tag_id = self.get_tag_id(tag) + if tag_id: + tag_ids.append(tag_id) + else: + logger.warning(f"Tag nicht gefunden: {tag}") + + if tag_ids: + params['tags__id__in'] = ','.join(str(t) for t in tag_ids) + + # Korrespondent + if correspondent: + if isinstance(correspondent, str): + correspondents = self.get_correspondents() + for c in correspondents.values(): + if c['name'].lower() == correspondent.lower(): + params['correspondent__id'] = c['id'] + break + else: + params['correspondent__id'] = correspondent + + # Dokumenttyp + if document_type: + if isinstance(document_type, str): + doc_types = self.get_document_types() + for dt in doc_types.values(): + if dt['name'].lower() == document_type.lower(): + params['document_type__id'] = dt['id'] + break + else: + params['document_type__id'] = document_type + + # Datumsfilter + date_field = self.config.date_field + + if year: + if month: + # Spezifischer Monat + if month == 12: + next_year = year + 1 + next_month = 1 + else: + next_year = year + next_month = month + 1 + + params[f'{date_field}__gte'] = f'{year}-{month:02d}-01' + params[f'{date_field}__lt'] = f'{next_year}-{next_month:02d}-01' + else: + # Ganzes Jahr + params[f'{date_field}__year'] = year + + if date_from: + params[f'{date_field}__gte'] = date_from.strftime('%Y-%m-%d') + + if date_to: + params[f'{date_field}__lte'] = date_to.strftime('%Y-%m-%d') + + # Volltextsuche + if query: + params['query'] = query + + # Extra-Filter + params.update(extra_filters) + + # Alle Dokumente abrufen + documents = list(self._get_paginated(self.ENDPOINTS['documents'], params)) + logger.info(f"Geladen: {len(documents)} Dokumente") + + return documents + + def get_document(self, document_id: int) -> dict: + """ + Holt ein einzelnes Dokument. + + Args: + document_id: ID des Dokuments + + Returns: + Dokument-Dictionary + """ + endpoint = f"{self.ENDPOINTS['documents']}{document_id}/" + return self._request('GET', endpoint) + + def get_document_url(self, document_id: int) -> str: + """Generiert die Web-URL für ein Dokument.""" + return f"{self.base_url}/documents/{document_id}/details" + + def get_document_download_url(self, document_id: int) -> str: + """Generiert die Download-URL für ein Dokument.""" + return f"{self.base_url}/api/documents/{document_id}/download/" + + # ==================== Hilfsmethoden ==================== + + def resolve_all_metadata(self, documents: List[dict]) -> List[dict]: + """ + Erweitert Dokumente um aufgelöste Metadaten (Tag-Namen, Korrespondent-Namen, etc.). + + Args: + documents: Liste von Dokumenten + + Returns: + Erweiterte Dokumente + """ + tags = self.get_tags() + correspondents = self.get_correspondents() + doc_types = self.get_document_types() + custom_fields = self.get_custom_fields() + + for doc in documents: + # Tag-Namen + doc['tag_names'] = [ + tags.get(tid, {}).get('name', f'Unknown-{tid}') + for tid in doc.get('tags', []) + ] + + # Korrespondent-Name + corr_id = doc.get('correspondent') + doc['correspondent_name'] = ( + correspondents.get(corr_id, {}).get('name', '') + if corr_id else '' + ) + + # Dokumenttyp-Name + dt_id = doc.get('document_type') + doc['document_type_name'] = ( + doc_types.get(dt_id, {}).get('name', '') + if dt_id else '' + ) + + # Custom Fields aufbereiten + doc['custom_fields_resolved'] = {} + for cf in doc.get('custom_fields', []): + field_id = cf.get('field') + field_def = custom_fields.get(field_id, {}) + field_name = field_def.get('name', f'field_{field_id}') + doc['custom_fields_resolved'][field_name] = { + 'value': cf.get('value'), + 'type': field_def.get('data_type', 'string'), + 'field_id': field_id + } + + # URL hinzufügen + doc['web_url'] = self.get_document_url(doc['id']) + + return documents + + def get_statistics(self) -> dict: + """ + Holt allgemeine Statistiken. + + Returns: + Dictionary mit Statistiken + """ + return { + 'total_documents': len(list(self._get_paginated( + self.ENDPOINTS['documents'], + params={'page_size': 1} + ))), + 'total_tags': len(self.get_tags()), + 'total_correspondents': len(self.get_correspondents()), + 'total_custom_fields': len(self.get_custom_fields()), + } diff --git a/paperless-report/report_generator.py b/paperless-report/report_generator.py new file mode 100644 index 0000000..e1914d9 --- /dev/null +++ b/paperless-report/report_generator.py @@ -0,0 +1,628 @@ +""" +Report Generator für das Paperless Finance Report Tool. + +Generiert Berichte in verschiedenen Formaten: CLI, HTML, PDF, JSON. +""" + +import json +import logging +import os +from datetime import datetime +from decimal import Decimal +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +from jinja2 import Environment, FileSystemLoader, select_autoescape + +from config import Config, get_config +from extractor import AggregationResult, FinanceDocument, GroupStats + +logger = logging.getLogger(__name__) + + +class DecimalEncoder(json.JSONEncoder): + """JSON Encoder für Decimal-Werte.""" + + def default(self, obj): + if isinstance(obj, Decimal): + return float(obj) + if isinstance(obj, datetime): + return obj.isoformat() + if isinstance(obj, FinanceDocument): + return { + 'id': obj.id, + 'title': obj.title, + 'betrag': float(obj.betrag) if obj.betrag else None, + 'effective_date': obj.effective_date.isoformat() if obj.effective_date else None, + 'correspondent': obj.correspondent, + 'kategorie': obj.kategorie, + 'tags': obj.tags, + 'web_url': obj.web_url, + } + if isinstance(obj, GroupStats): + return { + 'name': obj.name, + 'amount': float(obj.amount), + 'count': obj.count, + 'percentage': obj.percentage, + } + return super().default(obj) + + +class ReportGenerator: + """Generiert Finanzberichte in verschiedenen Formaten.""" + + def __init__(self, config: Optional[Config] = None): + """ + Initialisiert den Report Generator. + + Args: + config: Konfiguration + """ + self.config = config or get_config() + self.currency = self.config.currency + + # Jinja2 Template-Umgebung + template_dir = Path(__file__).parent / 'templates' + self.jinja_env = Environment( + loader=FileSystemLoader(str(template_dir)), + autoescape=select_autoescape(['html', 'xml']), + ) + + # Custom Filter registrieren + self.jinja_env.filters['format_amount'] = self._format_amount + self.jinja_env.filters['format_percent'] = self._format_percent + self.jinja_env.filters['format_date'] = self._format_date + + def _format_amount(self, value: Optional[Decimal], with_currency: bool = True) -> str: + """Formatiert einen Betrag.""" + if value is None: + return '-' + formatted = f"{value:,.2f}".replace(',', "'") + if with_currency: + return f"{self.currency} {formatted}" + return formatted + + def _format_percent(self, value: float) -> str: + """Formatiert einen Prozentwert.""" + return f"{value:.1f}%" + + def _format_date(self, value: Optional[datetime], fmt: str = '%d.%m.%Y') -> str: + """Formatiert ein Datum.""" + if value is None: + return '-' + return value.strftime(fmt) + + def _ensure_output_dir(self) -> Path: + """Stellt sicher, dass das Ausgabeverzeichnis existiert.""" + output_dir = self.config.output_path + output_dir.mkdir(parents=True, exist_ok=True) + return output_dir + + def _get_output_filename( + self, + year: Optional[int] = None, + month: Optional[int] = None, + extension: str = 'html' + ) -> str: + """Generiert den Ausgabe-Dateinamen.""" + pattern = self.config.get('output.filename_pattern', 'finanzbericht_{year}') + + now = datetime.now() + filename = pattern.format( + year=year or now.year, + month=month or now.month, + date=now.strftime('%Y-%m-%d'), + timestamp=now.strftime('%Y%m%d_%H%M%S'), + ) + + return f"{filename}.{extension}" + + # ==================== CLI Output ==================== + + def generate_cli( + self, + result: AggregationResult, + title: str = "Paperless Finanzbericht", + detail: bool = False + ) -> str: + """ + Generiert CLI-Ausgabe. + + Args: + result: Aggregationsergebnis + title: Berichtstitel + detail: Detailansicht aktivieren + + Returns: + Formatierter String für CLI-Ausgabe + """ + lines = [] + sep = "=" * 60 + + # Header + lines.append(sep) + lines.append(title.center(60)) + lines.append(sep) + lines.append("") + + # Übersicht + lines.append(f"Dokumente gesamt: {result.document_count}") + lines.append(f" - mit Betrag: {result.documents_with_amount}") + lines.append(f" - ohne Betrag: {result.documents_without_amount}") + lines.append("") + lines.append(f"Gesamtsumme: {self._format_amount(result.total_amount)}") + lines.append(f"Durchschnitt: {self._format_amount(result.average_amount)}") + lines.append(f"Median: {self._format_amount(result.median_amount)}") + lines.append(f"Minimum: {self._format_amount(result.min_amount)}") + lines.append(f"Maximum: {self._format_amount(result.max_amount)}") + lines.append("") + + # Nach Tag + if result.by_tag: + lines.append("-" * 60) + lines.append("Nach Tag:") + lines.append("-" * 60) + for name, stats in result.by_tag.items(): + amount_str = self._format_amount(stats.amount).rjust(18) + pct_str = f"({stats.percentage:5.1f}%)" + lines.append(f" {name:<25} {amount_str} {pct_str}") + lines.append("") + + # Nach Korrespondent + if result.by_correspondent and detail: + lines.append("-" * 60) + lines.append("Nach Korrespondent:") + lines.append("-" * 60) + for name, stats in list(result.by_correspondent.items())[:15]: + amount_str = self._format_amount(stats.amount).rjust(18) + pct_str = f"({stats.percentage:5.1f}%)" + lines.append(f" {name[:25]:<25} {amount_str} {pct_str}") + if len(result.by_correspondent) > 15: + lines.append(f" ... und {len(result.by_correspondent) - 15} weitere") + lines.append("") + + # Nach Kategorie + if result.by_category: + lines.append("-" * 60) + lines.append("Nach Kategorie:") + lines.append("-" * 60) + for name, stats in result.by_category.items(): + amount_str = self._format_amount(stats.amount).rjust(18) + pct_str = f"({stats.percentage:5.1f}%)" + lines.append(f" {name[:25]:<25} {amount_str} {pct_str}") + lines.append("") + + # Nach Monat + if result.by_month: + lines.append("-" * 60) + lines.append("Nach Monat:") + lines.append("-" * 60) + for month, stats in result.by_month.items(): + amount_str = self._format_amount(stats.amount).rjust(18) + lines.append(f" {month:<10} {amount_str} ({stats.count} Dok.)") + lines.append("") + + # Nach Zahlungsart + if result.by_payment_type and detail: + lines.append("-" * 60) + lines.append("Nach Zahlungsart:") + lines.append("-" * 60) + for name, stats in result.by_payment_type.items(): + amount_str = self._format_amount(stats.amount).rjust(18) + pct_str = f"({stats.percentage:5.1f}%)" + lines.append(f" {name:<25} {amount_str} {pct_str}") + lines.append("") + + # Top-Posten + if result.top_items and detail: + lines.append("-" * 60) + lines.append("Top 10 Einzelposten:") + lines.append("-" * 60) + for i, doc in enumerate(result.top_items[:10], 1): + amount_str = self._format_amount(doc.betrag).rjust(18) + title = doc.title[:35] + lines.append(f" {i:2}. {title:<35} {amount_str}") + lines.append("") + + lines.append(sep) + lines.append(f"Generiert: {datetime.now().strftime('%d.%m.%Y %H:%M')}") + lines.append(sep) + + return "\n".join(lines) + + # ==================== HTML Output ==================== + + def generate_html( + self, + result: AggregationResult, + title: str = "Paperless Finanzbericht", + year: Optional[int] = None, + month: Optional[int] = None, + comparison: Optional[Dict] = None + ) -> str: + """ + Generiert HTML-Bericht. + + Args: + result: Aggregationsergebnis + title: Berichtstitel + year: Jahr für den Bericht + month: Monat für den Bericht (optional) + comparison: Vergleichsdaten (optional) + + Returns: + HTML-String + """ + template = self.jinja_env.get_template('report.html') + + # Chart-Daten vorbereiten + tag_chart_data = self._prepare_chart_data(result.by_tag) + category_chart_data = self._prepare_chart_data(result.by_category) + month_chart_data = self._prepare_line_chart_data(result.by_month) + correspondent_chart_data = self._prepare_chart_data( + dict(list(result.by_correspondent.items())[:10]) + ) + + context = { + 'title': title, + 'year': year, + 'month': month, + 'currency': self.currency, + 'generated_at': datetime.now(), + 'result': result, + 'comparison': comparison, + + # Chart-Daten als JSON + 'tag_chart_data': json.dumps(tag_chart_data), + 'category_chart_data': json.dumps(category_chart_data), + 'month_chart_data': json.dumps(month_chart_data), + 'correspondent_chart_data': json.dumps(correspondent_chart_data), + } + + return template.render(**context) + + def _prepare_chart_data(self, groups: Dict[str, GroupStats]) -> Dict[str, Any]: + """Bereitet Daten für ein Balken-/Kreisdiagramm vor.""" + labels = [] + values = [] + colors = self._generate_colors(len(groups)) + + for name, stats in groups.items(): + labels.append(name) + values.append(float(stats.amount)) + + return { + 'labels': labels, + 'values': values, + 'colors': colors, + } + + def _prepare_line_chart_data(self, groups: Dict[str, GroupStats]) -> Dict[str, Any]: + """Bereitet Daten für ein Liniendiagramm vor.""" + # Nach Datum sortieren + sorted_items = sorted(groups.items()) + + labels = [item[0] for item in sorted_items] + values = [float(item[1].amount) for item in sorted_items] + + return { + 'labels': labels, + 'values': values, + } + + def _generate_colors(self, count: int) -> List[str]: + """Generiert eine Farbpalette.""" + # Vordefinierte Farben + colors = [ + '#2E86AB', # Blau + '#A23B72', # Magenta + '#F18F01', # Orange + '#C73E1D', # Rot + '#3B1F2B', # Dunkelrot + '#95C623', # Grün + '#5C5D67', # Grau + '#E8D21D', # Gelb + '#1B998B', # Türkis + '#7768AE', # Lila + ] + + # Farben wiederholen falls nötig + while len(colors) < count: + colors.extend(colors) + + return colors[:count] + + def save_html( + self, + result: AggregationResult, + title: str = "Paperless Finanzbericht", + year: Optional[int] = None, + month: Optional[int] = None, + comparison: Optional[Dict] = None, + filename: Optional[str] = None + ) -> Path: + """ + Speichert HTML-Bericht als Datei. + + Returns: + Pfad zur erstellten Datei + """ + html = self.generate_html(result, title, year, month, comparison) + + output_dir = self._ensure_output_dir() + if filename is None: + filename = self._get_output_filename(year, month, 'html') + + output_path = output_dir / filename + + with open(output_path, 'w', encoding='utf-8') as f: + f.write(html) + + logger.info(f"HTML-Bericht gespeichert: {output_path}") + return output_path + + # ==================== PDF Output ==================== + + def generate_pdf( + self, + result: AggregationResult, + title: str = "Paperless Finanzbericht", + year: Optional[int] = None, + month: Optional[int] = None, + comparison: Optional[Dict] = None + ) -> bytes: + """ + Generiert PDF-Bericht. + + Returns: + PDF als Bytes + """ + try: + from weasyprint import HTML, CSS + except ImportError: + raise ImportError( + "WeasyPrint ist nicht installiert. " + "Installiere mit: pip install weasyprint" + ) + + # HTML generieren + html_content = self.generate_html(result, title, year, month, comparison) + + # PDF generieren + html = HTML(string=html_content) + + # Zusätzliches CSS für PDF + pdf_css = CSS(string=''' + @page { + size: A4; + margin: 2cm; + } + body { + font-size: 10pt; + } + .chart-container { + page-break-inside: avoid; + } + table { + page-break-inside: avoid; + } + ''') + + return html.write_pdf(stylesheets=[pdf_css]) + + def save_pdf( + self, + result: AggregationResult, + title: str = "Paperless Finanzbericht", + year: Optional[int] = None, + month: Optional[int] = None, + comparison: Optional[Dict] = None, + filename: Optional[str] = None + ) -> Path: + """ + Speichert PDF-Bericht als Datei. + + Returns: + Pfad zur erstellten Datei + """ + pdf_bytes = self.generate_pdf(result, title, year, month, comparison) + + output_dir = self._ensure_output_dir() + if filename is None: + filename = self._get_output_filename(year, month, 'pdf') + + output_path = output_dir / filename + + with open(output_path, 'wb') as f: + f.write(pdf_bytes) + + logger.info(f"PDF-Bericht gespeichert: {output_path}") + return output_path + + # ==================== JSON Output ==================== + + def generate_json( + self, + result: AggregationResult, + indent: int = 2 + ) -> str: + """ + Generiert JSON-Ausgabe. + + Returns: + JSON-String + """ + data = { + 'generated_at': datetime.now().isoformat(), + 'currency': self.currency, + 'summary': { + 'total_amount': result.total_amount, + 'document_count': result.document_count, + 'documents_with_amount': result.documents_with_amount, + 'documents_without_amount': result.documents_without_amount, + 'average_amount': result.average_amount, + 'median_amount': result.median_amount, + 'min_amount': result.min_amount, + 'max_amount': result.max_amount, + }, + 'by_tag': result.by_tag, + 'by_correspondent': result.by_correspondent, + 'by_category': result.by_category, + 'by_payment_type': result.by_payment_type, + 'by_month': result.by_month, + 'top_items': result.top_items[:20], + 'documents': result.documents, + } + + return json.dumps(data, indent=indent, cls=DecimalEncoder, ensure_ascii=False) + + def save_json( + self, + result: AggregationResult, + year: Optional[int] = None, + month: Optional[int] = None, + filename: Optional[str] = None + ) -> Path: + """ + Speichert JSON-Bericht als Datei. + + Returns: + Pfad zur erstellten Datei + """ + json_str = self.generate_json(result) + + output_dir = self._ensure_output_dir() + if filename is None: + filename = self._get_output_filename(year, month, 'json') + + output_path = output_dir / filename + + with open(output_path, 'w', encoding='utf-8') as f: + f.write(json_str) + + logger.info(f"JSON-Bericht gespeichert: {output_path}") + return output_path + + # ==================== CSV Output ==================== + + def generate_csv( + self, + documents: List[FinanceDocument], + delimiter: str = ';' + ) -> str: + """ + Generiert CSV-Export der Dokumente. + + Returns: + CSV-String + """ + lines = [] + + # Header + headers = [ + 'ID', 'Titel', 'Datum', 'Betrag', 'Korrespondent', + 'Kategorie', 'Zahlungsart', 'Tags', 'URL' + ] + lines.append(delimiter.join(headers)) + + # Daten + for doc in documents: + row = [ + str(doc.id), + f'"{doc.title}"' if delimiter in doc.title else doc.title, + self._format_date(doc.effective_date), + self._format_amount(doc.betrag, with_currency=False) if doc.betrag else '', + doc.correspondent or '', + doc.kategorie or '', + doc.zahlungsart or '', + ', '.join(doc.tags), + doc.web_url or '', + ] + lines.append(delimiter.join(row)) + + return '\n'.join(lines) + + def save_csv( + self, + documents: List[FinanceDocument], + year: Optional[int] = None, + month: Optional[int] = None, + filename: Optional[str] = None + ) -> Path: + """Speichert CSV-Export als Datei.""" + csv_str = self.generate_csv(documents) + + output_dir = self._ensure_output_dir() + if filename is None: + filename = self._get_output_filename(year, month, 'csv') + + output_path = output_dir / filename + + with open(output_path, 'w', encoding='utf-8-sig') as f: # BOM für Excel + f.write(csv_str) + + logger.info(f"CSV-Export gespeichert: {output_path}") + return output_path + + # ==================== Vergleichsbericht ==================== + + def generate_comparison_cli(self, comparison: Dict) -> str: + """Generiert CLI-Ausgabe für Periodenvergleich.""" + lines = [] + sep = "=" * 70 + + p1 = comparison['period1'] + p2 = comparison['period2'] + + lines.append(sep) + lines.append(f"Vergleich: {p1['name']} vs {p2['name']}".center(70)) + lines.append(sep) + lines.append("") + + # Übersicht + lines.append(f"{'Kennzahl':<30} {p1['name']:>15} {p2['name']:>15} {'Diff':>10}") + lines.append("-" * 70) + + lines.append( + f"{'Gesamtsumme':<30} " + f"{self._format_amount(p1['total'], False):>15} " + f"{self._format_amount(p2['total'], False):>15} " + f"{comparison['diff_percent']:>+9.1f}%" + ) + + lines.append( + f"{'Anzahl Dokumente':<30} " + f"{p1['count']:>15} " + f"{p2['count']:>15} " + f"{p2['count'] - p1['count']:>+10}" + ) + + lines.append("") + lines.append("-" * 70) + lines.append("Nach Kategorie:") + lines.append("-" * 70) + + for cat, data in sorted( + comparison['category_comparison'].items(), + key=lambda x: abs(x[1]['diff_absolute']), + reverse=True + ): + status = "" + if data['status'] == 'new': + status = "[NEU]" + elif data['status'] == 'removed': + status = "[ENTF]" + + lines.append( + f" {cat[:25]:<25} " + f"{self._format_amount(data['period1'], False):>12} " + f"{self._format_amount(data['period2'], False):>12} " + f"{data['diff_percent']:>+8.1f}% " + f"{status}" + ) + + lines.append("") + lines.append(sep) + + return "\n".join(lines) diff --git a/paperless-report/requirements.txt b/paperless-report/requirements.txt new file mode 100644 index 0000000..7032bbc --- /dev/null +++ b/paperless-report/requirements.txt @@ -0,0 +1,31 @@ +# Paperless Finance Report Tool - Dependencies + +# HTTP Client +requests>=2.31.0 + +# CLI Framework +click>=8.1.7 + +# Configuration +pyyaml>=6.0.1 + +# HTML Templating +jinja2>=3.1.2 + +# PDF Generation +weasyprint>=60.1 + +# Data Processing +python-dateutil>=2.8.2 + +# Caching +diskcache>=5.6.3 + +# Logging (colorized output) +colorlog>=6.8.0 + +# Progress bars +tqdm>=4.66.1 + +# Table formatting for CLI +tabulate>=0.9.0 diff --git a/paperless-report/setup.py b/paperless-report/setup.py new file mode 100644 index 0000000..eb03d57 --- /dev/null +++ b/paperless-report/setup.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +""" +Setup-Skript für das Paperless Finance Report Tool. +""" + +from setuptools import setup, find_packages +from pathlib import Path + +# README einlesen +readme_path = Path(__file__).parent / 'README.md' +long_description = '' +if readme_path.exists(): + long_description = readme_path.read_text(encoding='utf-8') + +setup( + name='paperless-report', + version='1.0.0', + description='Finanz-Auswertungstool für Paperless-ngx', + long_description=long_description, + long_description_content_type='text/markdown', + author='Your Name', + author_email='your.email@example.com', + url='https://github.com/yourusername/paperless-report', + license='MIT', + + py_modules=[ + 'main', + 'config', + 'paperless_client', + 'extractor', + 'report_generator', + ], + + include_package_data=True, + package_data={ + '': ['templates/*.html', 'config.yaml.example'], + }, + + install_requires=[ + 'requests>=2.31.0', + 'click>=8.1.7', + 'pyyaml>=6.0.1', + 'jinja2>=3.1.2', + 'python-dateutil>=2.8.2', + 'tabulate>=0.9.0', + 'tqdm>=4.66.1', + ], + + extras_require={ + 'full': [ + 'weasyprint>=60.1', + 'diskcache>=5.6.3', + 'colorlog>=6.8.0', + ], + 'dev': [ + 'pytest>=7.4.0', + 'pytest-cov>=4.1.0', + 'black>=23.7.0', + 'flake8>=6.1.0', + 'mypy>=1.5.0', + ], + }, + + entry_points={ + 'console_scripts': [ + 'paperless-report=main:main', + ], + }, + + python_requires='>=3.8', + + classifiers=[ + 'Development Status :: 4 - Beta', + 'Environment :: Console', + 'Intended Audience :: End Users/Desktop', + 'License :: OSI Approved :: MIT License', + 'Operating System :: OS Independent', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', + 'Topic :: Office/Business :: Financial :: Accounting', + ], + + keywords='paperless paperless-ngx finance report accounting', +) diff --git a/paperless-report/templates/report.html b/paperless-report/templates/report.html new file mode 100644 index 0000000..fc2cfde --- /dev/null +++ b/paperless-report/templates/report.html @@ -0,0 +1,848 @@ + + + + + + {{ title }} + + + + +
+

{{ title }}

+
+ {% if year %} +
Zeitraum: + {% if month %} + {{ '%02d'|format(month) }}/{{ year }} + {% else %} + {{ year }} + {% endif %} +
+ {% endif %} +
Generiert: {{ generated_at|format_date('%d.%m.%Y %H:%M') }}
+
+
+ + +
+
+
Gesamtsumme
+
{{ result.total_amount|format_amount }}
+
{{ result.documents_with_amount }} Dokumente mit Betrag
+
+
+
Dokumente
+
{{ result.document_count }}
+
{{ result.documents_without_amount }} ohne Betrag
+
+
+
Durchschnitt
+
{{ result.average_amount|format_amount }}
+
pro Dokument
+
+
+
Median
+
{{ result.median_amount|format_amount }}
+
Min: {{ result.min_amount|format_amount }}
+
+
+ + +
+ {% if result.by_tag %} +
+

Verteilung nach Tag

+
+ +
+
+ {% endif %} + + {% if result.by_category %} +
+

Verteilung nach Kategorie

+
+ +
+
+ {% endif %} + + {% if result.by_month %} +
+

Monatsverlauf

+
+ +
+
+ {% endif %} + + {% if result.by_correspondent %} +
+

Top 10 Korrespondenten

+
+ +
+
+ {% endif %} +
+ + + {% if result.by_tag %} +
+

Nach Tag

+ + + + + + + + + + + + {% for name, stats in result.by_tag.items() %} + + + + + + + + {% endfor %} + +
TagBetragAnteilAnzahlVerteilung
{{ name }}{{ stats.amount|format_amount }}{{ stats.percentage|format_percent }}{{ stats.count }} +
+
+
+
+
+ {% endif %} + + + {% if result.by_category %} +
+

Nach Kategorie

+ + + + + + + + + + + + {% for name, stats in result.by_category.items() %} + + + + + + + + {% endfor %} + +
KategorieBetragAnteilAnzahlVerteilung
{{ name }}{{ stats.amount|format_amount }}{{ stats.percentage|format_percent }}{{ stats.count }} +
+
+
+
+
+ {% endif %} + + + {% if result.by_month %} +
+

Nach Monat

+ + + + + + + + + + {% for month, stats in result.by_month.items() %} + + + + + + {% endfor %} + +
MonatBetragAnzahl
{{ month }}{{ stats.amount|format_amount }}{{ stats.count }}
+
+ {% endif %} + + + {% if result.by_correspondent %} +
+

Nach Korrespondent

+ + + + + + + + + + + {% for name, stats in result.by_correspondent.items() %} + + + + + + + {% endfor %} + +
KorrespondentBetragAnteilAnzahl
{{ name }}{{ stats.amount|format_amount }}{{ stats.percentage|format_percent }}{{ stats.count }}
+
+ {% endif %} + + + {% if result.top_items %} +
+

Top 10 Einzelposten

+ + + + + + + + + + + + {% for doc in result.top_items[:10] %} + + + + + + + + {% endfor %} + +
#TitelDatumKorrespondentBetrag
{{ loop.index }} + {% if doc.web_url %} + {{ doc.title }} + {% else %} + {{ doc.title }} + {% endif %} + {{ doc.effective_date|format_date }}{{ doc.correspondent or '-' }}{{ doc.betrag|format_amount }}
+
+ {% endif %} + + + {% if comparison %} +
+

Periodenvergleich: {{ comparison.period1.name }} vs {{ comparison.period2.name }}

+ +
+
+
{{ comparison.period1.name }}
+
{{ comparison.period1.total|format_amount }}
+
{{ comparison.period1.count }} Dokumente
+
+
+
{{ comparison.period2.name }}
+
{{ comparison.period2.total|format_amount }}
+
{{ comparison.period2.count }} Dokumente
+
+
+
Veränderung
+
+ {{ '%+.1f'|format(comparison.diff_percent) }}% +
+
{{ comparison.diff_absolute|format_amount }}
+
+
+ + + + + + + + + + + + + {% for cat, data in comparison.category_comparison.items() %} + + + + + + + + {% endfor %} + +
Kategorie{{ comparison.period1.name }}{{ comparison.period2.name }}DifferenzVeränderung
{{ cat }}{{ data.period1|format_amount }}{{ data.period2|format_amount }}{{ data.diff_absolute|format_amount }} + {{ '%+.1f'|format(data.diff_percent) }}% +
+
+ {% endif %} + + +
+

Alle Dokumente ({{ result.document_count }})

+ + + + + + + + + + + + {% for doc in result.documents %} + + + + + + + + {% endfor %} + +
DatumTitelKorrespondentTagsBetrag
{{ doc.effective_date|format_date }} + {% if doc.web_url %} + {{ doc.title }} + {% else %} + {{ doc.title }} + {% endif %} + {{ doc.correspondent or '-' }} + {% for tag in doc.tags %} + {{ tag }} + {% endfor %} + {{ doc.betrag|format_amount if doc.betrag else '-' }}
+
+ + +
+ + +
+ + + + + +