Compare commits

...

2 Commits

Author SHA1 Message Date
Claude d2dd837f26 Add Paperless Finance Report Tool - Complete implementation
A Python CLI tool for generating financial reports from Paperless-ngx:

- Phase 1 (MVP): Config handling, Paperless API client with auth and
  pagination, custom fields extraction, tag-based summation, CLI output
- Phase 2 (Grouping): Multiple grouping criteria (tag, correspondent,
  category, payment type, month, quarter, year), percentage distribution
- Phase 3 (Reports): HTML reports with Chart.js diagrams (doughnut, bar,
  line charts), PDF export via WeasyPrint, JSON and CSV export
- Phase 4 (Comfort): Automatic tag ID resolution, disk caching with
  diskcache, colorized logging, comprehensive error handling

Features:
- Flexible date filtering (year, month, date range)
- Period comparison with change analysis
- Swiss franc formatting (CHF with apostrophe separators)
- Interactive HTML reports with sortable tables and document links
- Multiple output formats (CLI, HTML, PDF, JSON, CSV)
2025-12-07 10:09:10 +00:00
admin 3134418e6a Merge pull request #1 from metacube2/claude/github-sync-website-017YXsy55JgZ3uUCZx13NfZG
GitHub Sync Website with Apache Server
2025-12-06 10:55:26 +01:00
13 changed files with 3824 additions and 0 deletions
+41
View File
@@ -0,0 +1,41 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# Virtual environment
venv/
env/
.venv/
# Config with secrets
config.yaml
# Cache
.cache/
# Output files
output/*.html
output/*.pdf
output/*.json
output/*.csv
# IDE
.idea/
.vscode/
*.swp
*.swo
# Distribution / packaging
dist/
build/
*.egg-info/
# Testing
.pytest_cache/
.coverage
htmlcov/
# Misc
.DS_Store
*.log
+198
View File
@@ -0,0 +1,198 @@
# Paperless Finance Report
Ein Python-basiertes CLI-Tool, das über die Paperless-ngx REST-API Dokumente abruft, Beträge und Custom Fields extrahiert und daraus Finanzberichte generiert.
## Features
- **Basis-Auswertung**: Summe aller Beträge nach Tags, Kategorien, Korrespondenten
- **Zeiträume**: Filter nach Jahr, Monat oder beliebigem Datumsbereich
- **Gruppierung**: Nach Tag, Kategorie, Korrespondent, Zahlungsart, Monat, Quartal
- **Vergleichsberichte**: Jahresvergleiche mit Veränderungsanalyse
- **Mehrere Ausgabeformate**: CLI, HTML (mit Chart.js Diagrammen), PDF, JSON, CSV
- **Caching**: Optionaler Festplatten-Cache für bessere Performance
- **Flexibel**: Konfigurierbare Custom Field Namen
## Installation
### Voraussetzungen
- Python 3.8+
- Paperless-ngx Installation mit REST-API Zugriff
- API-Token (erstellen unter: Paperless → Einstellungen → Authentifizierungs-Tokens)
### Installation
```bash
# Repository klonen
git clone https://github.com/yourusername/paperless-report.git
cd paperless-report
# Virtuelle Umgebung erstellen
python3 -m venv venv
source venv/bin/activate # Linux/macOS
# oder: venv\Scripts\activate # Windows
# Dependencies installieren
pip install -r requirements.txt
# Optional: Vollinstallation mit PDF-Support
pip install -e ".[full]"
```
### Konfiguration
```bash
# Beispiel-Konfiguration erstellen
cp config.yaml.example config.yaml
# Konfiguration anpassen
nano config.yaml
```
Mindestens erforderlich:
```yaml
paperless:
url: "http://localhost:8000" # Deine Paperless URL
token: "YOUR_API_TOKEN" # API Token
```
Alternativ kann der Token auch als Umgebungsvariable gesetzt werden:
```bash
export PAPERLESS_TOKEN="your_api_token"
```
## Verwendung
### Verbindung testen
```bash
python main.py test
```
### Jahresbericht
```bash
# CLI-Ausgabe
python main.py report --year 2024
# Mit Details
python main.py report --year 2024 --detail
# HTML-Bericht
python main.py report --year 2024 --format html
# PDF-Bericht
python main.py report --year 2024 --format pdf
```
### Mit Filtern
```bash
# Nach Tag filtern
python main.py report --year 2024 --tag rechnung
# Nach Korrespondent filtern
python main.py report --year 2024 --correspondent "Swisscom"
# Nach Monat filtern
python main.py report --year 2024 --month 6
```
### Gruppierung
```bash
# Nach Tag gruppieren (Standard)
python main.py report --year 2024 --group-by tag
# Nach Korrespondent gruppieren
python main.py report --year 2024 --group-by correspondent
# Nach Kategorie und Monat gruppieren
python main.py report --year 2024 --group-by category --group-by month
```
### Jahresvergleich
```bash
# CLI-Vergleich
python main.py compare 2023 2024
# HTML-Vergleichsbericht
python main.py compare 2023 2024 --format html
```
### Weitere Befehle
```bash
# Dokumente auflisten
python main.py list-docs --tag rechnung --limit 50
# Cache löschen
python main.py clear-cache
# Hilfe anzeigen
python main.py --help
python main.py report --help
```
## Custom Fields in Paperless
Für die volle Funktionalität sollten folgende Custom Fields in Paperless angelegt werden:
| Feldname | Typ | Beschreibung |
|-----------------|----------|---------------------------------------|
| `betrag` | Währung | Rechnungsbetrag |
| `rechnungsdatum`| Datum | Datum der Rechnung |
| `kategorie` | Auswahl | Wohnen, Gesundheit, Mobilität, etc. |
| `zahlungsart` | Auswahl | Bar, Einzahlung, LSV, eBill |
Die Feldnamen können in der `config.yaml` angepasst werden.
## Ausgabeformate
### CLI
Einfache tabellarische Ausgabe im Terminal.
### HTML
Interaktiver Bericht mit:
- Zusammenfassungskarten
- Chart.js Diagramme (Doughnut, Bar, Line)
- Sortierbare Tabellen
- Links zu Paperless-Dokumenten
- Export-Button für CSV
### PDF
Druckfertiger PDF-Bericht (benötigt WeasyPrint).
### JSON
Maschinenlesbares Format für weitere Verarbeitung.
### CSV
Excel-kompatibles Format mit BOM für korrekte Umlaute.
## Projektstruktur
```
paperless-report/
├── config.yaml.example # Beispiel-Konfiguration
├── config.py # Konfigurationsmanagement
├── paperless_client.py # API-Client
├── extractor.py # Datenextraktion und -aggregation
├── report_generator.py # Berichtsgenerierung
├── main.py # CLI-Einstiegspunkt
├── templates/
│ └── report.html # HTML-Template
├── output/ # Generierte Berichte
├── requirements.txt
├── setup.py
└── README.md
```
## Lizenz
MIT License
+24
View File
@@ -0,0 +1,24 @@
"""
Paperless Finance Report Tool
Generiert Finanzberichte aus Paperless-ngx Dokumenten.
"""
__version__ = '1.0.0'
__author__ = 'Your Name'
from config import Config, get_config
from paperless_client import PaperlessClient, PaperlessAPIError
from extractor import DocumentExtractor, DataAggregator, FinanceDocument
from report_generator import ReportGenerator
__all__ = [
'Config',
'get_config',
'PaperlessClient',
'PaperlessAPIError',
'DocumentExtractor',
'DataAggregator',
'FinanceDocument',
'ReportGenerator',
]
+269
View File
@@ -0,0 +1,269 @@
"""
Konfigurationsmanagement für das Paperless Finance Report Tool.
Lädt und validiert die YAML-Konfiguration.
"""
import os
import sys
from pathlib import Path
from typing import Any, Optional
import yaml
class ConfigError(Exception):
"""Fehler bei der Konfiguration."""
pass
class Config:
"""Konfigurationsklasse für das Paperless Finance Report Tool."""
DEFAULT_CONFIG = {
'paperless': {
'url': 'http://localhost:8000',
'token': '',
'timeout': 30,
},
'custom_fields': {
'betrag': 'betrag',
'rechnungsdatum': 'rechnungsdatum',
'kategorie': 'kategorie',
'zahlungsart': 'zahlungsart',
'periode': 'periode',
'notiz': 'notiz',
},
'defaults': {
'currency': 'CHF',
'date_field': 'archive_date',
'invoice_tag': 'rechnung',
},
'tags': ['rechnung'],
'categories': [],
'output': {
'format': 'html',
'path': './output',
'filename_pattern': 'finanzbericht_{year}',
},
'cache': {
'enabled': True,
'path': './.cache',
'ttl': 3600,
},
'logging': {
'level': 'INFO',
'file': '',
'colorize': True,
},
}
def __init__(self, config_path: Optional[str] = None):
"""
Initialisiert die Konfiguration.
Args:
config_path: Pfad zur config.yaml. Falls None, wird im aktuellen
Verzeichnis und im Script-Verzeichnis gesucht.
"""
self._config = self.DEFAULT_CONFIG.copy()
self._config_path = self._find_config(config_path)
if self._config_path:
self._load_config()
self._validate_config()
def _find_config(self, config_path: Optional[str]) -> Optional[Path]:
"""Sucht nach der Konfigurationsdatei."""
if config_path:
path = Path(config_path)
if path.exists():
return path
raise ConfigError(f"Konfigurationsdatei nicht gefunden: {config_path}")
# Suchpfade
search_paths = [
Path.cwd() / 'config.yaml',
Path.cwd() / 'config.yml',
Path(__file__).parent / 'config.yaml',
Path(__file__).parent / 'config.yml',
Path.home() / '.config' / 'paperless-report' / 'config.yaml',
]
# Umgebungsvariable prüfen
env_path = os.environ.get('PAPERLESS_REPORT_CONFIG')
if env_path:
search_paths.insert(0, Path(env_path))
for path in search_paths:
if path.exists():
return path
return None
def _load_config(self) -> None:
"""Lädt die Konfiguration aus der YAML-Datei."""
try:
with open(self._config_path, 'r', encoding='utf-8') as f:
user_config = yaml.safe_load(f) or {}
# Rekursives Merge der Konfiguration
self._config = self._deep_merge(self._config, user_config)
except yaml.YAMLError as e:
raise ConfigError(f"Fehler beim Parsen der Konfiguration: {e}")
except IOError as e:
raise ConfigError(f"Fehler beim Lesen der Konfiguration: {e}")
def _deep_merge(self, base: dict, override: dict) -> dict:
"""Führt zwei Dictionaries rekursiv zusammen."""
result = base.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self._deep_merge(result[key], value)
else:
result[key] = value
return result
def _validate_config(self) -> None:
"""Validiert die Konfiguration."""
# Paperless URL prüfen
url = self.get('paperless.url', '')
if not url:
raise ConfigError("Paperless URL muss konfiguriert werden")
# Token prüfen (kann auch über Umgebungsvariable kommen)
token = self.get('paperless.token', '') or os.environ.get('PAPERLESS_TOKEN', '')
if not token:
raise ConfigError(
"Paperless API-Token muss konfiguriert werden.\n"
"Setze 'paperless.token' in config.yaml oder die Umgebungsvariable PAPERLESS_TOKEN"
)
# Token aus Umgebungsvariable übernehmen falls nicht in Config
if not self.get('paperless.token'):
self._config['paperless']['token'] = token
def get(self, key: str, default: Any = None) -> Any:
"""
Holt einen Konfigurationswert über Punkt-Notation.
Args:
key: Schlüssel in Punkt-Notation, z.B. 'paperless.url'
default: Standardwert falls Schlüssel nicht existiert
Returns:
Der Konfigurationswert oder der Standardwert
"""
keys = key.split('.')
value = self._config
try:
for k in keys:
value = value[k]
return value
except (KeyError, TypeError):
return default
def __getitem__(self, key: str) -> Any:
"""Ermöglicht Zugriff via config['key']."""
value = self.get(key)
if value is None:
raise KeyError(key)
return value
@property
def paperless_url(self) -> str:
"""Paperless Base-URL."""
url = self.get('paperless.url', '')
return url.rstrip('/')
@property
def paperless_token(self) -> str:
"""Paperless API-Token."""
return self.get('paperless.token', '')
@property
def timeout(self) -> int:
"""Request-Timeout in Sekunden."""
return self.get('paperless.timeout', 30)
@property
def currency(self) -> str:
"""Standardwährung."""
return self.get('defaults.currency', 'CHF')
@property
def date_field(self) -> str:
"""Datumsfeld für Filterung."""
return self.get('defaults.date_field', 'archive_date')
@property
def output_format(self) -> str:
"""Standard-Ausgabeformat."""
return self.get('output.format', 'html')
@property
def output_path(self) -> Path:
"""Ausgabeverzeichnis."""
return Path(self.get('output.path', './output'))
@property
def cache_enabled(self) -> bool:
"""Cache aktiviert."""
return self.get('cache.enabled', True)
@property
def cache_path(self) -> Path:
"""Cache-Verzeichnis."""
return Path(self.get('cache.path', './.cache'))
@property
def cache_ttl(self) -> int:
"""Cache-Gültigkeit in Sekunden."""
return self.get('cache.ttl', 3600)
@property
def log_level(self) -> str:
"""Log-Level."""
return self.get('logging.level', 'INFO')
@property
def custom_field_names(self) -> dict:
"""Mapping der Custom Field Namen."""
return self.get('custom_fields', {})
def get_custom_field_name(self, internal_name: str) -> str:
"""Holt den Paperless-Feldnamen für ein internes Feld."""
return self.get(f'custom_fields.{internal_name}', internal_name)
# Globale Config-Instanz (lazy loading)
_config: Optional[Config] = None
def get_config(config_path: Optional[str] = None) -> Config:
"""
Holt die globale Konfiguration.
Args:
config_path: Optionaler Pfad zur Konfigurationsdatei
Returns:
Config-Instanz
"""
global _config
if _config is None or config_path is not None:
_config = Config(config_path)
return _config
def reset_config() -> None:
"""Setzt die globale Konfiguration zurück (für Tests)."""
global _config
_config = None
+78
View File
@@ -0,0 +1,78 @@
# Paperless Finance Report - Konfiguration
# Kopiere diese Datei nach config.yaml und passe die Werte an
paperless:
# URL deiner Paperless-ngx Installation
url: "http://localhost:8000"
# API-Token (erstellen unter: Einstellungen → Authentifizierungs-Tokens)
token: "YOUR_API_TOKEN_HERE"
# Timeout für API-Anfragen in Sekunden
timeout: 30
# Mapping der Custom Field Namen in Paperless
# Die Namen müssen exakt mit den in Paperless angelegten Feldern übereinstimmen
custom_fields:
betrag: "betrag"
rechnungsdatum: "rechnungsdatum"
kategorie: "kategorie"
zahlungsart: "zahlungsart"
periode: "periode"
notiz: "notiz"
# Standardeinstellungen
defaults:
# Währung für Beträge
currency: "CHF"
# Welches Datumsfeld für Zeitraumfilter verwendet werden soll
# Optionen: "archive_date", "created", "added", oder ein Custom Field Name
date_field: "archive_date"
# Standard-Tag für Rechnungen (Name, nicht ID)
invoice_tag: "rechnung"
# Tag-Namen die automatisch erkannt werden sollen
# Die IDs werden beim ersten Start automatisch ermittelt
tags:
- rechnung
- miete
- krankenkasse
- steuern
- versicherung
- nebenkosten
# Kategorien für Gruppierung (müssen in Paperless als Auswahl-Optionen existieren)
categories:
- Wohnen
- Gesundheit
- Mobilität
- Versicherungen
- Steuern
- Lebensmittel
- Freizeit
- Diverses
# Ausgabe-Einstellungen
output:
# Standard-Format: html, pdf, json, cli
format: "html"
# Verzeichnis für generierte Berichte
path: "./output"
# Dateiname-Muster (Platzhalter: {year}, {month}, {date}, {timestamp})
filename_pattern: "finanzbericht_{year}"
# Cache-Einstellungen
cache:
# Cache aktivieren
enabled: true
# Cache-Verzeichnis
path: "./.cache"
# Cache-Gültigkeit in Sekunden (Standard: 1 Stunde)
ttl: 3600
# Logging-Einstellungen
logging:
# Log-Level: DEBUG, INFO, WARNING, ERROR
level: "INFO"
# Log-Datei (leer = nur Konsole)
file: ""
# Farbige Ausgabe
colorize: true
+592
View File
@@ -0,0 +1,592 @@
"""
Daten-Extraktion und Aggregation für das Paperless Finance Report Tool.
Extrahiert Custom Fields aus Dokumenten und aggregiert die Daten
für verschiedene Gruppierungen.
"""
import logging
import re
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal, InvalidOperation
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from dateutil.parser import parse as parse_date
from config import Config, get_config
from paperless_client import PaperlessClient
logger = logging.getLogger(__name__)
@dataclass
class FinanceDocument:
"""Ein aufbereitetes Finanzdokument."""
id: int
title: str
archive_date: Optional[datetime] = None
created: Optional[datetime] = None
added: Optional[datetime] = None
# Paperless Metadata
correspondent: Optional[str] = None
correspondent_id: Optional[int] = None
document_type: Optional[str] = None
tags: List[str] = field(default_factory=list)
tag_ids: List[int] = field(default_factory=list)
# Custom Fields
betrag: Optional[Decimal] = None
rechnungsdatum: Optional[datetime] = None
kategorie: Optional[str] = None
zahlungsart: Optional[str] = None
periode: Optional[str] = None
notiz: Optional[str] = None
# URLs
web_url: Optional[str] = None
# Original-Daten
raw_data: Dict = field(default_factory=dict)
@property
def effective_date(self) -> Optional[datetime]:
"""Das effektive Datum (Rechnungsdatum oder Archivdatum)."""
return self.rechnungsdatum or self.archive_date
@property
def year(self) -> Optional[int]:
"""Jahr des effektiven Datums."""
date = self.effective_date
return date.year if date else None
@property
def month(self) -> Optional[int]:
"""Monat des effektiven Datums."""
date = self.effective_date
return date.month if date else None
@property
def month_year(self) -> Optional[str]:
"""Monat/Jahr als String (z.B. '2024-01')."""
date = self.effective_date
return date.strftime('%Y-%m') if date else None
@property
def quarter(self) -> Optional[str]:
"""Quartal als String (z.B. 'Q1 2024')."""
date = self.effective_date
if not date:
return None
q = (date.month - 1) // 3 + 1
return f"Q{q} {date.year}"
class DocumentExtractor:
"""Extrahiert und verarbeitet Dokumente aus Paperless."""
def __init__(self, client: PaperlessClient, config: Optional[Config] = None):
"""
Initialisiert den Extractor.
Args:
client: Paperless API Client
config: Konfiguration
"""
self.client = client
self.config = config or get_config()
self._custom_fields_map: Dict[str, int] = {}
def _build_custom_fields_map(self) -> None:
"""Baut ein Mapping von Feldnamen zu IDs."""
if self._custom_fields_map:
return
fields = self.client.get_custom_fields()
for field_id, field_def in fields.items():
name = field_def['name'].lower()
self._custom_fields_map[name] = field_id
def _parse_decimal(self, value: Any) -> Optional[Decimal]:
"""
Parst einen Wert zu Decimal.
Verarbeitet verschiedene Formate:
- 1234.56
- 1234,56
- 1'234.56 (Schweizer Format)
- CHF 1234.56
"""
if value is None:
return None
if isinstance(value, (int, float)):
return Decimal(str(value))
if isinstance(value, Decimal):
return value
if not isinstance(value, str):
return None
# String bereinigen
value = value.strip()
# Währungssymbole entfernen
value = re.sub(r'^(CHF|EUR|USD|Fr\.?)\s*', '', value, flags=re.IGNORECASE)
value = re.sub(r'\s*(CHF|EUR|USD|Fr\.?)$', '', value, flags=re.IGNORECASE)
# Tausender-Trennzeichen entfernen (Apostroph, Punkt als Tausender)
# Schweizer Format: 1'234.56 oder 1'234,56
if "'" in value:
value = value.replace("'", "")
# Deutsches/Schweizer Format mit Punkt als Tausender: 1.234,56
if re.match(r'^\d{1,3}(\.\d{3})+,\d{2}$', value):
value = value.replace(".", "").replace(",", ".")
# Komma als Dezimaltrennzeichen ohne Tausender
elif "," in value and "." not in value:
value = value.replace(",", ".")
try:
return Decimal(value)
except InvalidOperation:
logger.warning(f"Konnte Betrag nicht parsen: {value}")
return None
def _parse_date(self, value: Any) -> Optional[datetime]:
"""Parst einen Wert zu datetime."""
if value is None:
return None
if isinstance(value, datetime):
return value
if not isinstance(value, str):
return None
try:
return parse_date(value)
except (ValueError, TypeError):
logger.warning(f"Konnte Datum nicht parsen: {value}")
return None
def _get_custom_field_value(self, doc: dict, field_name: str) -> Any:
"""Holt den Wert eines Custom Fields aus einem Dokument."""
# Aus resolved fields
resolved = doc.get('custom_fields_resolved', {})
if field_name in resolved:
return resolved[field_name].get('value')
# Aus rohen custom_fields
self._build_custom_fields_map()
field_name_lower = field_name.lower()
for cf in doc.get('custom_fields', []):
field_id = cf.get('field')
# Prüfen ob ID zum gesuchten Feldnamen passt
for name, fid in self._custom_fields_map.items():
if fid == field_id and name == field_name_lower:
return cf.get('value')
return None
def extract_document(self, raw_doc: dict) -> FinanceDocument:
"""
Extrahiert ein aufbereitetes FinanceDocument aus den Rohdaten.
Args:
raw_doc: Rohes Dokument-Dictionary von der API
Returns:
FinanceDocument-Instanz
"""
# Custom Field Namen aus Config
cf_names = self.config.custom_field_names
# Basis-Daten
doc = FinanceDocument(
id=raw_doc['id'],
title=raw_doc.get('title', ''),
raw_data=raw_doc
)
# Datums-Felder
doc.archive_date = self._parse_date(raw_doc.get('archive_date'))
doc.created = self._parse_date(raw_doc.get('created'))
doc.added = self._parse_date(raw_doc.get('added'))
# Korrespondent
doc.correspondent_id = raw_doc.get('correspondent')
doc.correspondent = raw_doc.get('correspondent_name', '')
# Dokumenttyp
doc.document_type = raw_doc.get('document_type_name', '')
# Tags
doc.tag_ids = raw_doc.get('tags', [])
doc.tags = raw_doc.get('tag_names', [])
# URL
doc.web_url = raw_doc.get('web_url', '')
# Custom Fields
betrag_name = cf_names.get('betrag', 'betrag')
doc.betrag = self._parse_decimal(
self._get_custom_field_value(raw_doc, betrag_name)
)
datum_name = cf_names.get('rechnungsdatum', 'rechnungsdatum')
doc.rechnungsdatum = self._parse_date(
self._get_custom_field_value(raw_doc, datum_name)
)
kat_name = cf_names.get('kategorie', 'kategorie')
doc.kategorie = self._get_custom_field_value(raw_doc, kat_name)
zahl_name = cf_names.get('zahlungsart', 'zahlungsart')
doc.zahlungsart = self._get_custom_field_value(raw_doc, zahl_name)
periode_name = cf_names.get('periode', 'periode')
doc.periode = self._get_custom_field_value(raw_doc, periode_name)
notiz_name = cf_names.get('notiz', 'notiz')
doc.notiz = self._get_custom_field_value(raw_doc, notiz_name)
return doc
def extract_documents(self, raw_docs: List[dict]) -> List[FinanceDocument]:
"""
Extrahiert mehrere Dokumente.
Args:
raw_docs: Liste von Roh-Dokumenten
Returns:
Liste von FinanceDocument-Instanzen
"""
# Metadaten auflösen
resolved = self.client.resolve_all_metadata(raw_docs)
return [self.extract_document(doc) for doc in resolved]
@dataclass
class AggregationResult:
"""Ergebnis einer Aggregation."""
# Basis-Statistiken
total_amount: Decimal = Decimal('0')
document_count: int = 0
documents_with_amount: int = 0
documents_without_amount: int = 0
# Dokumente
documents: List[FinanceDocument] = field(default_factory=list)
# Gruppierte Daten
by_tag: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_correspondent: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_category: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_payment_type: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_month: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_quarter: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_year: Dict[int, 'GroupStats'] = field(default_factory=dict)
# Zusätzliche Statistiken
average_amount: Decimal = Decimal('0')
median_amount: Decimal = Decimal('0')
min_amount: Decimal = Decimal('0')
max_amount: Decimal = Decimal('0')
top_items: List[FinanceDocument] = field(default_factory=list)
@property
def total_formatted(self) -> str:
"""Formatierte Gesamtsumme."""
return f"{self.total_amount:,.2f}".replace(',', "'")
@dataclass
class GroupStats:
"""Statistiken für eine Gruppe."""
name: str
amount: Decimal = Decimal('0')
count: int = 0
percentage: float = 0.0
documents: List[FinanceDocument] = field(default_factory=list)
@property
def amount_formatted(self) -> str:
"""Formatierter Betrag."""
return f"{self.amount:,.2f}".replace(',', "'")
class DataAggregator:
"""Aggregiert Finanzdokumente nach verschiedenen Kriterien."""
def __init__(self, config: Optional[Config] = None):
"""
Initialisiert den Aggregator.
Args:
config: Konfiguration
"""
self.config = config or get_config()
def aggregate(
self,
documents: List[FinanceDocument],
group_by: Optional[List[str]] = None
) -> AggregationResult:
"""
Aggregiert Dokumente.
Args:
documents: Liste von Dokumenten
group_by: Liste von Gruppierungskriterien:
'tag', 'correspondent', 'category', 'payment_type',
'month', 'quarter', 'year'
Returns:
AggregationResult mit allen Statistiken
"""
result = AggregationResult()
result.documents = documents
result.document_count = len(documents)
# Beträge sammeln
amounts: List[Decimal] = []
for doc in documents:
if doc.betrag is not None:
result.total_amount += doc.betrag
result.documents_with_amount += 1
amounts.append(doc.betrag)
else:
result.documents_without_amount += 1
# Basis-Statistiken
if amounts:
amounts_sorted = sorted(amounts)
result.min_amount = amounts_sorted[0]
result.max_amount = amounts_sorted[-1]
result.average_amount = result.total_amount / len(amounts)
# Median
mid = len(amounts_sorted) // 2
if len(amounts_sorted) % 2 == 0:
result.median_amount = (amounts_sorted[mid - 1] + amounts_sorted[mid]) / 2
else:
result.median_amount = amounts_sorted[mid]
# Top-Posten
docs_with_amount = [d for d in documents if d.betrag is not None]
result.top_items = sorted(
docs_with_amount,
key=lambda d: d.betrag or Decimal('0'),
reverse=True
)[:10]
# Gruppierungen
group_by = group_by or ['tag', 'correspondent', 'category', 'month']
if 'tag' in group_by:
result.by_tag = self._group_by_tags(documents, result.total_amount)
if 'correspondent' in group_by:
result.by_correspondent = self._group_by_field(
documents, 'correspondent', result.total_amount
)
if 'category' in group_by:
result.by_category = self._group_by_field(
documents, 'kategorie', result.total_amount
)
if 'payment_type' in group_by:
result.by_payment_type = self._group_by_field(
documents, 'zahlungsart', result.total_amount
)
if 'month' in group_by:
result.by_month = self._group_by_field(
documents, 'month_year', result.total_amount
)
if 'quarter' in group_by:
result.by_quarter = self._group_by_field(
documents, 'quarter', result.total_amount
)
if 'year' in group_by:
result.by_year = self._group_by_field(
documents, 'year', result.total_amount
)
return result
def _group_by_tags(
self,
documents: List[FinanceDocument],
total: Decimal
) -> Dict[str, GroupStats]:
"""Gruppiert nach Tags (ein Dokument kann mehrere Tags haben)."""
groups: Dict[str, GroupStats] = {}
for doc in documents:
if not doc.tags:
tag_name = 'Ohne Tag'
if tag_name not in groups:
groups[tag_name] = GroupStats(name=tag_name)
groups[tag_name].count += 1
if doc.betrag:
groups[tag_name].amount += doc.betrag
groups[tag_name].documents.append(doc)
else:
for tag in doc.tags:
if tag not in groups:
groups[tag] = GroupStats(name=tag)
groups[tag].count += 1
if doc.betrag:
groups[tag].amount += doc.betrag
groups[tag].documents.append(doc)
# Prozente berechnen
if total > 0:
for stats in groups.values():
stats.percentage = float(stats.amount / total * 100)
# Nach Betrag sortieren
return dict(sorted(
groups.items(),
key=lambda x: x[1].amount,
reverse=True
))
def _group_by_field(
self,
documents: List[FinanceDocument],
field: str,
total: Decimal
) -> Dict[str, GroupStats]:
"""Gruppiert nach einem einzelnen Feld."""
groups: Dict[str, GroupStats] = {}
for doc in documents:
value = getattr(doc, field, None)
if value is None or value == '':
key = 'Nicht zugeordnet'
else:
key = str(value)
if key not in groups:
groups[key] = GroupStats(name=key)
groups[key].count += 1
if doc.betrag:
groups[key].amount += doc.betrag
groups[key].documents.append(doc)
# Prozente berechnen
if total > 0:
for stats in groups.values():
stats.percentage = float(stats.amount / total * 100)
# Nach Betrag sortieren (bei Monaten chronologisch)
if field in ('month_year', 'quarter'):
return dict(sorted(groups.items()))
else:
return dict(sorted(
groups.items(),
key=lambda x: x[1].amount,
reverse=True
))
def compare_periods(
self,
documents: List[FinanceDocument],
period1: Union[int, str],
period2: Union[int, str],
period_type: str = 'year'
) -> Dict[str, Any]:
"""
Vergleicht zwei Zeiträume.
Args:
documents: Alle Dokumente
period1: Erste Periode (z.B. 2023)
period2: Zweite Periode (z.B. 2024)
period_type: 'year', 'quarter', 'month'
Returns:
Vergleichsergebnis
"""
# Dokumente nach Periode filtern
def get_period(doc: FinanceDocument) -> Optional[Union[int, str]]:
if period_type == 'year':
return doc.year
elif period_type == 'quarter':
return doc.quarter
elif period_type == 'month':
return doc.month_year
return None
docs1 = [d for d in documents if get_period(d) == period1]
docs2 = [d for d in documents if get_period(d) == period2]
agg1 = self.aggregate(docs1, ['tag', 'category'])
agg2 = self.aggregate(docs2, ['tag', 'category'])
# Differenzen berechnen
diff_absolute = agg2.total_amount - agg1.total_amount
diff_percent = (
float(diff_absolute / agg1.total_amount * 100)
if agg1.total_amount > 0 else 0
)
# Kategorien vergleichen
category_comparison = {}
all_categories = set(agg1.by_category.keys()) | set(agg2.by_category.keys())
for cat in all_categories:
stats1 = agg1.by_category.get(cat, GroupStats(name=cat))
stats2 = agg2.by_category.get(cat, GroupStats(name=cat))
diff = stats2.amount - stats1.amount
pct_change = (
float(diff / stats1.amount * 100)
if stats1.amount > 0 else (100.0 if stats2.amount > 0 else 0)
)
category_comparison[cat] = {
'period1': stats1.amount,
'period2': stats2.amount,
'diff_absolute': diff,
'diff_percent': pct_change,
'status': 'new' if stats1.amount == 0 else (
'removed' if stats2.amount == 0 else 'changed'
)
}
return {
'period1': {
'name': str(period1),
'total': agg1.total_amount,
'count': agg1.document_count,
'aggregation': agg1,
},
'period2': {
'name': str(period2),
'total': agg2.total_amount,
'count': agg2.document_count,
'aggregation': agg2,
},
'diff_absolute': diff_absolute,
'diff_percent': diff_percent,
'category_comparison': category_comparison,
}
+489
View File
@@ -0,0 +1,489 @@
#!/usr/bin/env python3
"""
Paperless Finance Report Tool
CLI-Einstiegspunkt für das Paperless Finanz-Auswertungstool.
Generiert Finanzberichte aus Paperless-ngx Dokumenten.
"""
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import List, Optional
import click
from tabulate import tabulate
# Lokale Imports
from config import Config, ConfigError, get_config, reset_config
from extractor import DataAggregator, DocumentExtractor
from paperless_client import PaperlessAPIError, PaperlessClient
from report_generator import ReportGenerator
# Logger einrichten
logger = logging.getLogger('paperless_report')
def setup_logging(level: str = 'INFO', colorize: bool = True) -> None:
"""Richtet das Logging ein."""
log_level = getattr(logging, level.upper(), logging.INFO)
if colorize:
try:
import colorlog
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)-8s%(reset)s %(message)s',
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
}
))
except ImportError:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(levelname)-8s %(message)s'))
else:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(levelname)-8s %(message)s'))
logger.addHandler(handler)
logger.setLevel(log_level)
# Auch für andere Module
logging.getLogger('paperless_report').setLevel(log_level)
def get_cache(config: Config):
"""Erstellt den Cache falls aktiviert."""
if not config.cache_enabled:
return None
try:
from diskcache import Cache
cache_path = config.cache_path
cache_path.mkdir(parents=True, exist_ok=True)
return Cache(str(cache_path))
except ImportError:
logger.warning("diskcache nicht installiert, Cache deaktiviert")
return None
# CLI-Gruppe
@click.group()
@click.option('--config', '-c', 'config_path', type=click.Path(exists=True),
help='Pfad zur Konfigurationsdatei')
@click.option('--verbose', '-v', is_flag=True, help='Ausführliche Ausgabe')
@click.option('--quiet', '-q', is_flag=True, help='Nur Fehler ausgeben')
@click.pass_context
def cli(ctx, config_path: Optional[str], verbose: bool, quiet: bool):
"""
Paperless Finance Report Tool
Generiert Finanzberichte aus Paperless-ngx Dokumenten.
Beispiele:
# Jahresbericht 2024
paperless-report report --year 2024
# Mit Tag-Filter
paperless-report report --year 2024 --tag rechnung
# Jahresvergleich
paperless-report compare 2023 2024
# Verbindung testen
paperless-report test
"""
ctx.ensure_object(dict)
# Log-Level bestimmen
if quiet:
log_level = 'ERROR'
elif verbose:
log_level = 'DEBUG'
else:
log_level = 'INFO'
setup_logging(log_level)
# Config laden
try:
reset_config()
config = get_config(config_path)
ctx.obj['config'] = config
except ConfigError as e:
click.echo(f"Konfigurationsfehler: {e}", err=True)
sys.exit(1)
@cli.command()
@click.pass_context
def test(ctx):
"""Testet die Verbindung zur Paperless-API."""
config = ctx.obj['config']
click.echo(f"Teste Verbindung zu {config.paperless_url}...")
try:
cache = get_cache(config)
client = PaperlessClient(config, cache)
if client.test_connection():
click.echo(click.style("Verbindung erfolgreich!", fg='green'))
# Statistiken anzeigen
click.echo("\nStatistiken:")
tags = client.get_tags()
correspondents = client.get_correspondents()
custom_fields = client.get_custom_fields()
click.echo(f" Tags: {len(tags)}")
click.echo(f" Korrespondenten: {len(correspondents)}")
click.echo(f" Custom Fields: {len(custom_fields)}")
# Custom Fields auflisten
if custom_fields:
click.echo("\nCustom Fields:")
for field_id, field in custom_fields.items():
click.echo(f" - {field['name']} (Typ: {field.get('data_type', 'unknown')})")
else:
click.echo(click.style("Verbindung fehlgeschlagen!", fg='red'))
sys.exit(1)
except PaperlessAPIError as e:
click.echo(click.style(f"API-Fehler: {e}", fg='red'), err=True)
sys.exit(1)
@cli.command()
@click.option('--year', '-y', type=int, help='Jahr für den Bericht')
@click.option('--month', '-m', type=int, help='Monat (1-12)')
@click.option('--tag', '-t', 'tags', multiple=True, help='Nach Tag filtern (mehrfach möglich)')
@click.option('--correspondent', help='Nach Korrespondent filtern')
@click.option('--group-by', '-g', 'group_by',
type=click.Choice(['tag', 'correspondent', 'category', 'payment_type', 'month', 'quarter', 'year']),
multiple=True, default=['tag', 'category', 'month'],
help='Gruppierung (mehrfach möglich)')
@click.option('--format', '-f', 'output_format',
type=click.Choice(['cli', 'html', 'pdf', 'json', 'csv']),
default='cli', help='Ausgabeformat')
@click.option('--output', '-o', 'output_file', type=click.Path(),
help='Ausgabedatei (optional)')
@click.option('--detail', '-d', is_flag=True, help='Detaillierte Ausgabe')
@click.option('--no-cache', is_flag=True, help='Cache ignorieren')
@click.pass_context
def report(ctx, year: Optional[int], month: Optional[int], tags: tuple,
correspondent: Optional[str], group_by: tuple, output_format: str,
output_file: Optional[str], detail: bool, no_cache: bool):
"""
Generiert einen Finanzbericht.
Beispiele:
# Jahresbericht 2024 als CLI
paperless-report report --year 2024
# HTML-Bericht mit Tag-Filter
paperless-report report --year 2024 --tag rechnung --format html
# Detaillierter Bericht nach Korrespondent gruppiert
paperless-report report --year 2024 --group-by correspondent --detail
# PDF für einen bestimmten Monat
paperless-report report --year 2024 --month 6 --format pdf
"""
config = ctx.obj['config']
# Standard: aktuelles Jahr
if not year:
year = datetime.now().year
click.echo(f"Kein Jahr angegeben, verwende {year}")
try:
cache = None if no_cache else get_cache(config)
client = PaperlessClient(config, cache)
extractor = DocumentExtractor(client, config)
aggregator = DataAggregator(config)
generator = ReportGenerator(config)
# Dokumente abrufen
click.echo(f"Lade Dokumente für {year}" + (f"/{month}" if month else "") + "...")
with click.progressbar(length=1, label='API-Abfrage') as bar:
raw_docs = client.get_documents(
tags=list(tags) if tags else None,
correspondent=correspondent,
year=year,
month=month,
)
bar.update(1)
if not raw_docs:
click.echo(click.style("Keine Dokumente gefunden.", fg='yellow'))
return
click.echo(f"Gefunden: {len(raw_docs)} Dokumente")
# Dokumente extrahieren
click.echo("Extrahiere Daten...")
documents = extractor.extract_documents(raw_docs)
# Aggregieren
click.echo("Aggregiere Daten...")
result = aggregator.aggregate(documents, list(group_by))
# Titel generieren
if month:
title = f"Paperless Finanzbericht {month:02d}/{year}"
else:
title = f"Paperless Finanzbericht {year}"
# Ausgabe
if output_format == 'cli':
output = generator.generate_cli(result, title, detail)
click.echo()
click.echo(output)
elif output_format == 'html':
if output_file:
path = Path(output_file)
else:
path = generator.save_html(result, title, year, month)
click.echo(click.style(f"HTML-Bericht gespeichert: {path}", fg='green'))
# Bericht öffnen?
if click.confirm("Bericht im Browser öffnen?", default=True):
import webbrowser
webbrowser.open(f"file://{path.absolute()}")
elif output_format == 'pdf':
if output_file:
path = Path(output_file)
pdf_bytes = generator.generate_pdf(result, title, year, month)
with open(path, 'wb') as f:
f.write(pdf_bytes)
else:
path = generator.save_pdf(result, title, year, month)
click.echo(click.style(f"PDF-Bericht gespeichert: {path}", fg='green'))
elif output_format == 'json':
if output_file:
path = Path(output_file)
json_str = generator.generate_json(result)
with open(path, 'w', encoding='utf-8') as f:
f.write(json_str)
else:
path = generator.save_json(result, year, month)
click.echo(click.style(f"JSON-Export gespeichert: {path}", fg='green'))
elif output_format == 'csv':
if output_file:
path = Path(output_file)
csv_str = generator.generate_csv(documents)
with open(path, 'w', encoding='utf-8-sig') as f:
f.write(csv_str)
else:
path = generator.save_csv(documents, year, month)
click.echo(click.style(f"CSV-Export gespeichert: {path}", fg='green'))
except PaperlessAPIError as e:
click.echo(click.style(f"API-Fehler: {e}", fg='red'), err=True)
sys.exit(1)
except Exception as e:
logger.exception("Unerwarteter Fehler")
click.echo(click.style(f"Fehler: {e}", fg='red'), err=True)
sys.exit(1)
@cli.command()
@click.argument('period1', type=int)
@click.argument('period2', type=int)
@click.option('--tag', '-t', 'tags', multiple=True, help='Nach Tag filtern')
@click.option('--format', '-f', 'output_format',
type=click.Choice(['cli', 'html']), default='cli',
help='Ausgabeformat')
@click.option('--output', '-o', 'output_file', type=click.Path(),
help='Ausgabedatei')
@click.pass_context
def compare(ctx, period1: int, period2: int, tags: tuple,
output_format: str, output_file: Optional[str]):
"""
Vergleicht zwei Zeiträume (Jahre).
Beispiele:
# Jahresvergleich 2023 vs 2024
paperless-report compare 2023 2024
# Mit Tag-Filter
paperless-report compare 2023 2024 --tag rechnung
# Als HTML
paperless-report compare 2023 2024 --format html
"""
config = ctx.obj['config']
try:
cache = get_cache(config)
client = PaperlessClient(config, cache)
extractor = DocumentExtractor(client, config)
aggregator = DataAggregator(config)
generator = ReportGenerator(config)
# Dokumente für beide Perioden laden
click.echo(f"Lade Dokumente für {period1} und {period2}...")
raw_docs_1 = client.get_documents(
tags=list(tags) if tags else None,
year=period1
)
raw_docs_2 = client.get_documents(
tags=list(tags) if tags else None,
year=period2
)
click.echo(f"Gefunden: {len(raw_docs_1)} ({period1}) / {len(raw_docs_2)} ({period2})")
# Dokumente zusammenführen und extrahieren
all_raw_docs = raw_docs_1 + raw_docs_2
all_docs = extractor.extract_documents(all_raw_docs)
# Vergleich
click.echo("Vergleiche Perioden...")
comparison = aggregator.compare_periods(all_docs, period1, period2)
if output_format == 'cli':
output = generator.generate_comparison_cli(comparison)
click.echo()
click.echo(output)
elif output_format == 'html':
# Aggregation für das neuere Jahr als Basis
docs_2 = [d for d in all_docs if d.year == period2]
result = aggregator.aggregate(docs_2, ['tag', 'category', 'month'])
title = f"Vergleich {period1} vs {period2}"
if output_file:
path = Path(output_file)
html = generator.generate_html(result, title, period2, comparison=comparison)
with open(path, 'w', encoding='utf-8') as f:
f.write(html)
else:
path = generator.save_html(result, title, period2, comparison=comparison)
click.echo(click.style(f"Vergleichsbericht gespeichert: {path}", fg='green'))
except PaperlessAPIError as e:
click.echo(click.style(f"API-Fehler: {e}", fg='red'), err=True)
sys.exit(1)
@cli.command()
@click.option('--tag', '-t', 'tags', multiple=True, help='Nach Tag filtern')
@click.option('--year', '-y', type=int, help='Jahr')
@click.option('--limit', '-l', type=int, default=20, help='Anzahl Dokumente')
@click.pass_context
def list_docs(ctx, tags: tuple, year: Optional[int], limit: int):
"""
Listet Dokumente auf.
Beispiele:
# Letzte 20 Dokumente
paperless-report list-docs
# Mit Tag-Filter
paperless-report list-docs --tag rechnung --limit 50
"""
config = ctx.obj['config']
try:
cache = get_cache(config)
client = PaperlessClient(config, cache)
extractor = DocumentExtractor(client, config)
raw_docs = client.get_documents(
tags=list(tags) if tags else None,
year=year
)
if not raw_docs:
click.echo("Keine Dokumente gefunden.")
return
documents = extractor.extract_documents(raw_docs[:limit])
# Tabelle erstellen
table_data = []
for doc in documents:
table_data.append([
doc.id,
(doc.effective_date.strftime('%d.%m.%Y')
if doc.effective_date else '-'),
doc.title[:40] + ('...' if len(doc.title) > 40 else ''),
doc.correspondent[:20] if doc.correspondent else '-',
(f"{config.currency} {doc.betrag:,.2f}".replace(',', "'")
if doc.betrag else '-'),
])
headers = ['ID', 'Datum', 'Titel', 'Korrespondent', 'Betrag']
click.echo(tabulate(table_data, headers=headers, tablefmt='simple'))
click.echo(f"\nGesamt: {len(raw_docs)} Dokumente (zeige {min(limit, len(raw_docs))})")
except PaperlessAPIError as e:
click.echo(click.style(f"API-Fehler: {e}", fg='red'), err=True)
sys.exit(1)
@cli.command()
@click.pass_context
def clear_cache(ctx):
"""Löscht den Cache."""
config = ctx.obj['config']
cache_path = config.cache_path
if cache_path.exists():
import shutil
shutil.rmtree(cache_path)
click.echo(click.style("Cache gelöscht.", fg='green'))
else:
click.echo("Kein Cache vorhanden.")
@cli.command()
@click.pass_context
def init(ctx):
"""Erstellt eine Beispiel-Konfigurationsdatei."""
config_file = Path.cwd() / 'config.yaml'
if config_file.exists():
if not click.confirm(f"{config_file} existiert bereits. Überschreiben?"):
return
# Beispiel-Config kopieren
example_config = Path(__file__).parent / 'config.yaml.example'
if example_config.exists():
import shutil
shutil.copy(example_config, config_file)
click.echo(click.style(f"Konfiguration erstellt: {config_file}", fg='green'))
click.echo("\nBitte bearbeite die Datei und setze:")
click.echo(" - paperless.url: URL deiner Paperless-Installation")
click.echo(" - paperless.token: API-Token")
else:
click.echo(click.style("Beispiel-Konfiguration nicht gefunden.", fg='red'))
def main():
"""Haupteinstiegspunkt."""
cli(obj={})
if __name__ == '__main__':
main()
+1
View File
@@ -0,0 +1 @@
# Dieses Verzeichnis enthält generierte Berichte
+537
View File
@@ -0,0 +1,537 @@
"""
Paperless-ngx API Client.
Handhabt die Kommunikation mit der Paperless REST-API inkl. Paginierung und Caching.
"""
import hashlib
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Union
from urllib.parse import urlencode, urljoin
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from config import Config, get_config
logger = logging.getLogger(__name__)
class PaperlessAPIError(Exception):
"""Fehler bei der API-Kommunikation."""
def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[dict] = None):
super().__init__(message)
self.status_code = status_code
self.response = response
class PaperlessClient:
"""Client für die Paperless-ngx REST-API."""
# API-Endpunkte
ENDPOINTS = {
'documents': '/api/documents/',
'tags': '/api/tags/',
'correspondents': '/api/correspondents/',
'document_types': '/api/document_types/',
'custom_fields': '/api/custom_fields/',
'storage_paths': '/api/storage_paths/',
}
def __init__(self, config: Optional[Config] = None, cache: Optional[Any] = None):
"""
Initialisiert den API-Client.
Args:
config: Konfigurationsobjekt. Falls None, wird globale Config verwendet.
cache: Optionales Cache-Objekt (diskcache.Cache)
"""
self.config = config or get_config()
self.base_url = self.config.paperless_url
self.token = self.config.paperless_token
self.timeout = self.config.timeout
self.cache = cache
# Session mit Retry-Logik erstellen
self.session = self._create_session()
# Cached Metadata
self._custom_fields_cache: Optional[Dict[int, dict]] = None
self._tags_cache: Optional[Dict[int, dict]] = None
self._correspondents_cache: Optional[Dict[int, dict]] = None
self._document_types_cache: Optional[Dict[int, dict]] = None
def _create_session(self) -> requests.Session:
"""Erstellt eine Session mit Retry-Konfiguration."""
session = requests.Session()
# Retry-Strategie
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount('http://', adapter)
session.mount('https://', adapter)
# Standard-Header
session.headers.update({
'Authorization': f'Token {self.token}',
'Accept': 'application/json',
'Content-Type': 'application/json',
})
return session
def _get_cache_key(self, endpoint: str, params: Optional[dict] = None) -> str:
"""Generiert einen Cache-Schlüssel."""
key_data = f"{self.base_url}{endpoint}"
if params:
key_data += json.dumps(params, sort_keys=True)
return hashlib.md5(key_data.encode()).hexdigest()
def _request(
self,
method: str,
endpoint: str,
params: Optional[dict] = None,
data: Optional[dict] = None,
use_cache: bool = True
) -> dict:
"""
Führt einen API-Request durch.
Args:
method: HTTP-Methode (GET, POST, etc.)
endpoint: API-Endpunkt (relativ zur Base-URL)
params: Query-Parameter
data: Request-Body
use_cache: Cache verwenden (nur für GET)
Returns:
API-Response als Dictionary
"""
url = urljoin(self.base_url, endpoint)
# Cache prüfen (nur GET-Requests)
if method.upper() == 'GET' and use_cache and self.cache:
cache_key = self._get_cache_key(endpoint, params)
cached = self.cache.get(cache_key)
if cached is not None:
logger.debug(f"Cache hit für {endpoint}")
return cached
logger.debug(f"API Request: {method} {url} params={params}")
try:
response = self.session.request(
method=method,
url=url,
params=params,
json=data,
timeout=self.timeout
)
response.raise_for_status()
result = response.json()
# In Cache speichern (nur GET)
if method.upper() == 'GET' and use_cache and self.cache:
self.cache.set(cache_key, result, expire=self.config.cache_ttl)
return result
except requests.exceptions.HTTPError as e:
error_msg = f"HTTP-Fehler: {e}"
try:
error_detail = e.response.json()
error_msg = f"{error_msg} - {error_detail}"
except (ValueError, AttributeError):
pass
raise PaperlessAPIError(
error_msg,
status_code=e.response.status_code if e.response else None
)
except requests.exceptions.ConnectionError as e:
raise PaperlessAPIError(f"Verbindungsfehler: Kann {self.base_url} nicht erreichen")
except requests.exceptions.Timeout as e:
raise PaperlessAPIError(f"Timeout nach {self.timeout}s")
except requests.exceptions.RequestException as e:
raise PaperlessAPIError(f"Request-Fehler: {e}")
def _get_paginated(
self,
endpoint: str,
params: Optional[dict] = None,
page_size: int = 100
) -> Generator[dict, None, None]:
"""
Holt alle Seiten eines paginierten Endpunkts.
Args:
endpoint: API-Endpunkt
params: Zusätzliche Query-Parameter
page_size: Anzahl Ergebnisse pro Seite
Yields:
Einzelne Ergebnis-Objekte
"""
params = params or {}
params['page_size'] = page_size
page = 1
while True:
params['page'] = page
logger.debug(f"Lade Seite {page} von {endpoint}")
response = self._request('GET', endpoint, params=params)
results = response.get('results', [])
for item in results:
yield item
# Prüfen ob weitere Seiten existieren
if not response.get('next'):
break
page += 1
def test_connection(self) -> bool:
"""
Testet die Verbindung zur Paperless-API.
Returns:
True wenn Verbindung erfolgreich
"""
try:
self._request('GET', self.ENDPOINTS['tags'], params={'page_size': 1})
return True
except PaperlessAPIError:
return False
# ==================== Custom Fields ====================
def get_custom_fields(self, refresh: bool = False) -> Dict[int, dict]:
"""
Holt alle Custom Field Definitionen.
Args:
refresh: Cache ignorieren und neu laden
Returns:
Dictionary mit Field-ID als Key und Definition als Value
"""
if self._custom_fields_cache is not None and not refresh:
return self._custom_fields_cache
fields = {}
for field in self._get_paginated(self.ENDPOINTS['custom_fields']):
fields[field['id']] = field
self._custom_fields_cache = fields
logger.info(f"Geladen: {len(fields)} Custom Fields")
return fields
def get_custom_field_by_name(self, name: str) -> Optional[dict]:
"""
Findet ein Custom Field anhand des Namens.
Args:
name: Name des Custom Fields
Returns:
Field-Definition oder None
"""
fields = self.get_custom_fields()
for field in fields.values():
if field['name'].lower() == name.lower():
return field
return None
# ==================== Tags ====================
def get_tags(self, refresh: bool = False) -> Dict[int, dict]:
"""
Holt alle Tags.
Returns:
Dictionary mit Tag-ID als Key
"""
if self._tags_cache is not None and not refresh:
return self._tags_cache
tags = {}
for tag in self._get_paginated(self.ENDPOINTS['tags']):
tags[tag['id']] = tag
self._tags_cache = tags
logger.info(f"Geladen: {len(tags)} Tags")
return tags
def get_tag_by_name(self, name: str) -> Optional[dict]:
"""Findet einen Tag anhand des Namens."""
tags = self.get_tags()
for tag in tags.values():
if tag['name'].lower() == name.lower():
return tag
return None
def get_tag_id(self, name: str) -> Optional[int]:
"""Holt die ID eines Tags anhand des Namens."""
tag = self.get_tag_by_name(name)
return tag['id'] if tag else None
# ==================== Correspondents ====================
def get_correspondents(self, refresh: bool = False) -> Dict[int, dict]:
"""
Holt alle Korrespondenten.
Returns:
Dictionary mit Correspondent-ID als Key
"""
if self._correspondents_cache is not None and not refresh:
return self._correspondents_cache
correspondents = {}
for corr in self._get_paginated(self.ENDPOINTS['correspondents']):
correspondents[corr['id']] = corr
self._correspondents_cache = correspondents
logger.info(f"Geladen: {len(correspondents)} Korrespondenten")
return correspondents
def get_correspondent_name(self, correspondent_id: int) -> str:
"""Holt den Namen eines Korrespondenten."""
correspondents = self.get_correspondents()
corr = correspondents.get(correspondent_id)
return corr['name'] if corr else f"Unbekannt ({correspondent_id})"
# ==================== Document Types ====================
def get_document_types(self, refresh: bool = False) -> Dict[int, dict]:
"""Holt alle Dokumenttypen."""
if self._document_types_cache is not None and not refresh:
return self._document_types_cache
doc_types = {}
for dt in self._get_paginated(self.ENDPOINTS['document_types']):
doc_types[dt['id']] = dt
self._document_types_cache = doc_types
return doc_types
# ==================== Documents ====================
def get_documents(
self,
tags: Optional[List[Union[int, str]]] = None,
correspondent: Optional[Union[int, str]] = None,
document_type: Optional[Union[int, str]] = None,
year: Optional[int] = None,
month: Optional[int] = None,
date_from: Optional[datetime] = None,
date_to: Optional[datetime] = None,
query: Optional[str] = None,
ordering: str = '-archive_date',
**extra_filters
) -> List[dict]:
"""
Holt Dokumente mit optionalen Filtern.
Args:
tags: Liste von Tag-IDs oder Namen
correspondent: Korrespondent-ID oder Name
document_type: Dokumenttyp-ID oder Name
year: Jahr (für archive_date)
month: Monat (1-12, nur zusammen mit year)
date_from: Startdatum
date_to: Enddatum
query: Volltextsuche
ordering: Sortierung
**extra_filters: Zusätzliche Filter für die API
Returns:
Liste von Dokumenten
"""
params = {'ordering': ordering}
# Tags verarbeiten
if tags:
tag_ids = []
for tag in tags:
if isinstance(tag, int):
tag_ids.append(tag)
else:
tag_id = self.get_tag_id(tag)
if tag_id:
tag_ids.append(tag_id)
else:
logger.warning(f"Tag nicht gefunden: {tag}")
if tag_ids:
params['tags__id__in'] = ','.join(str(t) for t in tag_ids)
# Korrespondent
if correspondent:
if isinstance(correspondent, str):
correspondents = self.get_correspondents()
for c in correspondents.values():
if c['name'].lower() == correspondent.lower():
params['correspondent__id'] = c['id']
break
else:
params['correspondent__id'] = correspondent
# Dokumenttyp
if document_type:
if isinstance(document_type, str):
doc_types = self.get_document_types()
for dt in doc_types.values():
if dt['name'].lower() == document_type.lower():
params['document_type__id'] = dt['id']
break
else:
params['document_type__id'] = document_type
# Datumsfilter
date_field = self.config.date_field
if year:
if month:
# Spezifischer Monat
if month == 12:
next_year = year + 1
next_month = 1
else:
next_year = year
next_month = month + 1
params[f'{date_field}__gte'] = f'{year}-{month:02d}-01'
params[f'{date_field}__lt'] = f'{next_year}-{next_month:02d}-01'
else:
# Ganzes Jahr
params[f'{date_field}__year'] = year
if date_from:
params[f'{date_field}__gte'] = date_from.strftime('%Y-%m-%d')
if date_to:
params[f'{date_field}__lte'] = date_to.strftime('%Y-%m-%d')
# Volltextsuche
if query:
params['query'] = query
# Extra-Filter
params.update(extra_filters)
# Alle Dokumente abrufen
documents = list(self._get_paginated(self.ENDPOINTS['documents'], params))
logger.info(f"Geladen: {len(documents)} Dokumente")
return documents
def get_document(self, document_id: int) -> dict:
"""
Holt ein einzelnes Dokument.
Args:
document_id: ID des Dokuments
Returns:
Dokument-Dictionary
"""
endpoint = f"{self.ENDPOINTS['documents']}{document_id}/"
return self._request('GET', endpoint)
def get_document_url(self, document_id: int) -> str:
"""Generiert die Web-URL für ein Dokument."""
return f"{self.base_url}/documents/{document_id}/details"
def get_document_download_url(self, document_id: int) -> str:
"""Generiert die Download-URL für ein Dokument."""
return f"{self.base_url}/api/documents/{document_id}/download/"
# ==================== Hilfsmethoden ====================
def resolve_all_metadata(self, documents: List[dict]) -> List[dict]:
"""
Erweitert Dokumente um aufgelöste Metadaten (Tag-Namen, Korrespondent-Namen, etc.).
Args:
documents: Liste von Dokumenten
Returns:
Erweiterte Dokumente
"""
tags = self.get_tags()
correspondents = self.get_correspondents()
doc_types = self.get_document_types()
custom_fields = self.get_custom_fields()
for doc in documents:
# Tag-Namen
doc['tag_names'] = [
tags.get(tid, {}).get('name', f'Unknown-{tid}')
for tid in doc.get('tags', [])
]
# Korrespondent-Name
corr_id = doc.get('correspondent')
doc['correspondent_name'] = (
correspondents.get(corr_id, {}).get('name', '')
if corr_id else ''
)
# Dokumenttyp-Name
dt_id = doc.get('document_type')
doc['document_type_name'] = (
doc_types.get(dt_id, {}).get('name', '')
if dt_id else ''
)
# Custom Fields aufbereiten
doc['custom_fields_resolved'] = {}
for cf in doc.get('custom_fields', []):
field_id = cf.get('field')
field_def = custom_fields.get(field_id, {})
field_name = field_def.get('name', f'field_{field_id}')
doc['custom_fields_resolved'][field_name] = {
'value': cf.get('value'),
'type': field_def.get('data_type', 'string'),
'field_id': field_id
}
# URL hinzufügen
doc['web_url'] = self.get_document_url(doc['id'])
return documents
def get_statistics(self) -> dict:
"""
Holt allgemeine Statistiken.
Returns:
Dictionary mit Statistiken
"""
return {
'total_documents': len(list(self._get_paginated(
self.ENDPOINTS['documents'],
params={'page_size': 1}
))),
'total_tags': len(self.get_tags()),
'total_correspondents': len(self.get_correspondents()),
'total_custom_fields': len(self.get_custom_fields()),
}
+628
View File
@@ -0,0 +1,628 @@
"""
Report Generator für das Paperless Finance Report Tool.
Generiert Berichte in verschiedenen Formaten: CLI, HTML, PDF, JSON.
"""
import json
import logging
import os
from datetime import datetime
from decimal import Decimal
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from jinja2 import Environment, FileSystemLoader, select_autoescape
from config import Config, get_config
from extractor import AggregationResult, FinanceDocument, GroupStats
logger = logging.getLogger(__name__)
class DecimalEncoder(json.JSONEncoder):
"""JSON Encoder für Decimal-Werte."""
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, FinanceDocument):
return {
'id': obj.id,
'title': obj.title,
'betrag': float(obj.betrag) if obj.betrag else None,
'effective_date': obj.effective_date.isoformat() if obj.effective_date else None,
'correspondent': obj.correspondent,
'kategorie': obj.kategorie,
'tags': obj.tags,
'web_url': obj.web_url,
}
if isinstance(obj, GroupStats):
return {
'name': obj.name,
'amount': float(obj.amount),
'count': obj.count,
'percentage': obj.percentage,
}
return super().default(obj)
class ReportGenerator:
"""Generiert Finanzberichte in verschiedenen Formaten."""
def __init__(self, config: Optional[Config] = None):
"""
Initialisiert den Report Generator.
Args:
config: Konfiguration
"""
self.config = config or get_config()
self.currency = self.config.currency
# Jinja2 Template-Umgebung
template_dir = Path(__file__).parent / 'templates'
self.jinja_env = Environment(
loader=FileSystemLoader(str(template_dir)),
autoescape=select_autoescape(['html', 'xml']),
)
# Custom Filter registrieren
self.jinja_env.filters['format_amount'] = self._format_amount
self.jinja_env.filters['format_percent'] = self._format_percent
self.jinja_env.filters['format_date'] = self._format_date
def _format_amount(self, value: Optional[Decimal], with_currency: bool = True) -> str:
"""Formatiert einen Betrag."""
if value is None:
return '-'
formatted = f"{value:,.2f}".replace(',', "'")
if with_currency:
return f"{self.currency} {formatted}"
return formatted
def _format_percent(self, value: float) -> str:
"""Formatiert einen Prozentwert."""
return f"{value:.1f}%"
def _format_date(self, value: Optional[datetime], fmt: str = '%d.%m.%Y') -> str:
"""Formatiert ein Datum."""
if value is None:
return '-'
return value.strftime(fmt)
def _ensure_output_dir(self) -> Path:
"""Stellt sicher, dass das Ausgabeverzeichnis existiert."""
output_dir = self.config.output_path
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir
def _get_output_filename(
self,
year: Optional[int] = None,
month: Optional[int] = None,
extension: str = 'html'
) -> str:
"""Generiert den Ausgabe-Dateinamen."""
pattern = self.config.get('output.filename_pattern', 'finanzbericht_{year}')
now = datetime.now()
filename = pattern.format(
year=year or now.year,
month=month or now.month,
date=now.strftime('%Y-%m-%d'),
timestamp=now.strftime('%Y%m%d_%H%M%S'),
)
return f"{filename}.{extension}"
# ==================== CLI Output ====================
def generate_cli(
self,
result: AggregationResult,
title: str = "Paperless Finanzbericht",
detail: bool = False
) -> str:
"""
Generiert CLI-Ausgabe.
Args:
result: Aggregationsergebnis
title: Berichtstitel
detail: Detailansicht aktivieren
Returns:
Formatierter String für CLI-Ausgabe
"""
lines = []
sep = "=" * 60
# Header
lines.append(sep)
lines.append(title.center(60))
lines.append(sep)
lines.append("")
# Übersicht
lines.append(f"Dokumente gesamt: {result.document_count}")
lines.append(f" - mit Betrag: {result.documents_with_amount}")
lines.append(f" - ohne Betrag: {result.documents_without_amount}")
lines.append("")
lines.append(f"Gesamtsumme: {self._format_amount(result.total_amount)}")
lines.append(f"Durchschnitt: {self._format_amount(result.average_amount)}")
lines.append(f"Median: {self._format_amount(result.median_amount)}")
lines.append(f"Minimum: {self._format_amount(result.min_amount)}")
lines.append(f"Maximum: {self._format_amount(result.max_amount)}")
lines.append("")
# Nach Tag
if result.by_tag:
lines.append("-" * 60)
lines.append("Nach Tag:")
lines.append("-" * 60)
for name, stats in result.by_tag.items():
amount_str = self._format_amount(stats.amount).rjust(18)
pct_str = f"({stats.percentage:5.1f}%)"
lines.append(f" {name:<25} {amount_str} {pct_str}")
lines.append("")
# Nach Korrespondent
if result.by_correspondent and detail:
lines.append("-" * 60)
lines.append("Nach Korrespondent:")
lines.append("-" * 60)
for name, stats in list(result.by_correspondent.items())[:15]:
amount_str = self._format_amount(stats.amount).rjust(18)
pct_str = f"({stats.percentage:5.1f}%)"
lines.append(f" {name[:25]:<25} {amount_str} {pct_str}")
if len(result.by_correspondent) > 15:
lines.append(f" ... und {len(result.by_correspondent) - 15} weitere")
lines.append("")
# Nach Kategorie
if result.by_category:
lines.append("-" * 60)
lines.append("Nach Kategorie:")
lines.append("-" * 60)
for name, stats in result.by_category.items():
amount_str = self._format_amount(stats.amount).rjust(18)
pct_str = f"({stats.percentage:5.1f}%)"
lines.append(f" {name[:25]:<25} {amount_str} {pct_str}")
lines.append("")
# Nach Monat
if result.by_month:
lines.append("-" * 60)
lines.append("Nach Monat:")
lines.append("-" * 60)
for month, stats in result.by_month.items():
amount_str = self._format_amount(stats.amount).rjust(18)
lines.append(f" {month:<10} {amount_str} ({stats.count} Dok.)")
lines.append("")
# Nach Zahlungsart
if result.by_payment_type and detail:
lines.append("-" * 60)
lines.append("Nach Zahlungsart:")
lines.append("-" * 60)
for name, stats in result.by_payment_type.items():
amount_str = self._format_amount(stats.amount).rjust(18)
pct_str = f"({stats.percentage:5.1f}%)"
lines.append(f" {name:<25} {amount_str} {pct_str}")
lines.append("")
# Top-Posten
if result.top_items and detail:
lines.append("-" * 60)
lines.append("Top 10 Einzelposten:")
lines.append("-" * 60)
for i, doc in enumerate(result.top_items[:10], 1):
amount_str = self._format_amount(doc.betrag).rjust(18)
title = doc.title[:35]
lines.append(f" {i:2}. {title:<35} {amount_str}")
lines.append("")
lines.append(sep)
lines.append(f"Generiert: {datetime.now().strftime('%d.%m.%Y %H:%M')}")
lines.append(sep)
return "\n".join(lines)
# ==================== HTML Output ====================
def generate_html(
self,
result: AggregationResult,
title: str = "Paperless Finanzbericht",
year: Optional[int] = None,
month: Optional[int] = None,
comparison: Optional[Dict] = None
) -> str:
"""
Generiert HTML-Bericht.
Args:
result: Aggregationsergebnis
title: Berichtstitel
year: Jahr für den Bericht
month: Monat für den Bericht (optional)
comparison: Vergleichsdaten (optional)
Returns:
HTML-String
"""
template = self.jinja_env.get_template('report.html')
# Chart-Daten vorbereiten
tag_chart_data = self._prepare_chart_data(result.by_tag)
category_chart_data = self._prepare_chart_data(result.by_category)
month_chart_data = self._prepare_line_chart_data(result.by_month)
correspondent_chart_data = self._prepare_chart_data(
dict(list(result.by_correspondent.items())[:10])
)
context = {
'title': title,
'year': year,
'month': month,
'currency': self.currency,
'generated_at': datetime.now(),
'result': result,
'comparison': comparison,
# Chart-Daten als JSON
'tag_chart_data': json.dumps(tag_chart_data),
'category_chart_data': json.dumps(category_chart_data),
'month_chart_data': json.dumps(month_chart_data),
'correspondent_chart_data': json.dumps(correspondent_chart_data),
}
return template.render(**context)
def _prepare_chart_data(self, groups: Dict[str, GroupStats]) -> Dict[str, Any]:
"""Bereitet Daten für ein Balken-/Kreisdiagramm vor."""
labels = []
values = []
colors = self._generate_colors(len(groups))
for name, stats in groups.items():
labels.append(name)
values.append(float(stats.amount))
return {
'labels': labels,
'values': values,
'colors': colors,
}
def _prepare_line_chart_data(self, groups: Dict[str, GroupStats]) -> Dict[str, Any]:
"""Bereitet Daten für ein Liniendiagramm vor."""
# Nach Datum sortieren
sorted_items = sorted(groups.items())
labels = [item[0] for item in sorted_items]
values = [float(item[1].amount) for item in sorted_items]
return {
'labels': labels,
'values': values,
}
def _generate_colors(self, count: int) -> List[str]:
"""Generiert eine Farbpalette."""
# Vordefinierte Farben
colors = [
'#2E86AB', # Blau
'#A23B72', # Magenta
'#F18F01', # Orange
'#C73E1D', # Rot
'#3B1F2B', # Dunkelrot
'#95C623', # Grün
'#5C5D67', # Grau
'#E8D21D', # Gelb
'#1B998B', # Türkis
'#7768AE', # Lila
]
# Farben wiederholen falls nötig
while len(colors) < count:
colors.extend(colors)
return colors[:count]
def save_html(
self,
result: AggregationResult,
title: str = "Paperless Finanzbericht",
year: Optional[int] = None,
month: Optional[int] = None,
comparison: Optional[Dict] = None,
filename: Optional[str] = None
) -> Path:
"""
Speichert HTML-Bericht als Datei.
Returns:
Pfad zur erstellten Datei
"""
html = self.generate_html(result, title, year, month, comparison)
output_dir = self._ensure_output_dir()
if filename is None:
filename = self._get_output_filename(year, month, 'html')
output_path = output_dir / filename
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html)
logger.info(f"HTML-Bericht gespeichert: {output_path}")
return output_path
# ==================== PDF Output ====================
def generate_pdf(
self,
result: AggregationResult,
title: str = "Paperless Finanzbericht",
year: Optional[int] = None,
month: Optional[int] = None,
comparison: Optional[Dict] = None
) -> bytes:
"""
Generiert PDF-Bericht.
Returns:
PDF als Bytes
"""
try:
from weasyprint import HTML, CSS
except ImportError:
raise ImportError(
"WeasyPrint ist nicht installiert. "
"Installiere mit: pip install weasyprint"
)
# HTML generieren
html_content = self.generate_html(result, title, year, month, comparison)
# PDF generieren
html = HTML(string=html_content)
# Zusätzliches CSS für PDF
pdf_css = CSS(string='''
@page {
size: A4;
margin: 2cm;
}
body {
font-size: 10pt;
}
.chart-container {
page-break-inside: avoid;
}
table {
page-break-inside: avoid;
}
''')
return html.write_pdf(stylesheets=[pdf_css])
def save_pdf(
self,
result: AggregationResult,
title: str = "Paperless Finanzbericht",
year: Optional[int] = None,
month: Optional[int] = None,
comparison: Optional[Dict] = None,
filename: Optional[str] = None
) -> Path:
"""
Speichert PDF-Bericht als Datei.
Returns:
Pfad zur erstellten Datei
"""
pdf_bytes = self.generate_pdf(result, title, year, month, comparison)
output_dir = self._ensure_output_dir()
if filename is None:
filename = self._get_output_filename(year, month, 'pdf')
output_path = output_dir / filename
with open(output_path, 'wb') as f:
f.write(pdf_bytes)
logger.info(f"PDF-Bericht gespeichert: {output_path}")
return output_path
# ==================== JSON Output ====================
def generate_json(
self,
result: AggregationResult,
indent: int = 2
) -> str:
"""
Generiert JSON-Ausgabe.
Returns:
JSON-String
"""
data = {
'generated_at': datetime.now().isoformat(),
'currency': self.currency,
'summary': {
'total_amount': result.total_amount,
'document_count': result.document_count,
'documents_with_amount': result.documents_with_amount,
'documents_without_amount': result.documents_without_amount,
'average_amount': result.average_amount,
'median_amount': result.median_amount,
'min_amount': result.min_amount,
'max_amount': result.max_amount,
},
'by_tag': result.by_tag,
'by_correspondent': result.by_correspondent,
'by_category': result.by_category,
'by_payment_type': result.by_payment_type,
'by_month': result.by_month,
'top_items': result.top_items[:20],
'documents': result.documents,
}
return json.dumps(data, indent=indent, cls=DecimalEncoder, ensure_ascii=False)
def save_json(
self,
result: AggregationResult,
year: Optional[int] = None,
month: Optional[int] = None,
filename: Optional[str] = None
) -> Path:
"""
Speichert JSON-Bericht als Datei.
Returns:
Pfad zur erstellten Datei
"""
json_str = self.generate_json(result)
output_dir = self._ensure_output_dir()
if filename is None:
filename = self._get_output_filename(year, month, 'json')
output_path = output_dir / filename
with open(output_path, 'w', encoding='utf-8') as f:
f.write(json_str)
logger.info(f"JSON-Bericht gespeichert: {output_path}")
return output_path
# ==================== CSV Output ====================
def generate_csv(
self,
documents: List[FinanceDocument],
delimiter: str = ';'
) -> str:
"""
Generiert CSV-Export der Dokumente.
Returns:
CSV-String
"""
lines = []
# Header
headers = [
'ID', 'Titel', 'Datum', 'Betrag', 'Korrespondent',
'Kategorie', 'Zahlungsart', 'Tags', 'URL'
]
lines.append(delimiter.join(headers))
# Daten
for doc in documents:
row = [
str(doc.id),
f'"{doc.title}"' if delimiter in doc.title else doc.title,
self._format_date(doc.effective_date),
self._format_amount(doc.betrag, with_currency=False) if doc.betrag else '',
doc.correspondent or '',
doc.kategorie or '',
doc.zahlungsart or '',
', '.join(doc.tags),
doc.web_url or '',
]
lines.append(delimiter.join(row))
return '\n'.join(lines)
def save_csv(
self,
documents: List[FinanceDocument],
year: Optional[int] = None,
month: Optional[int] = None,
filename: Optional[str] = None
) -> Path:
"""Speichert CSV-Export als Datei."""
csv_str = self.generate_csv(documents)
output_dir = self._ensure_output_dir()
if filename is None:
filename = self._get_output_filename(year, month, 'csv')
output_path = output_dir / filename
with open(output_path, 'w', encoding='utf-8-sig') as f: # BOM für Excel
f.write(csv_str)
logger.info(f"CSV-Export gespeichert: {output_path}")
return output_path
# ==================== Vergleichsbericht ====================
def generate_comparison_cli(self, comparison: Dict) -> str:
"""Generiert CLI-Ausgabe für Periodenvergleich."""
lines = []
sep = "=" * 70
p1 = comparison['period1']
p2 = comparison['period2']
lines.append(sep)
lines.append(f"Vergleich: {p1['name']} vs {p2['name']}".center(70))
lines.append(sep)
lines.append("")
# Übersicht
lines.append(f"{'Kennzahl':<30} {p1['name']:>15} {p2['name']:>15} {'Diff':>10}")
lines.append("-" * 70)
lines.append(
f"{'Gesamtsumme':<30} "
f"{self._format_amount(p1['total'], False):>15} "
f"{self._format_amount(p2['total'], False):>15} "
f"{comparison['diff_percent']:>+9.1f}%"
)
lines.append(
f"{'Anzahl Dokumente':<30} "
f"{p1['count']:>15} "
f"{p2['count']:>15} "
f"{p2['count'] - p1['count']:>+10}"
)
lines.append("")
lines.append("-" * 70)
lines.append("Nach Kategorie:")
lines.append("-" * 70)
for cat, data in sorted(
comparison['category_comparison'].items(),
key=lambda x: abs(x[1]['diff_absolute']),
reverse=True
):
status = ""
if data['status'] == 'new':
status = "[NEU]"
elif data['status'] == 'removed':
status = "[ENTF]"
lines.append(
f" {cat[:25]:<25} "
f"{self._format_amount(data['period1'], False):>12} "
f"{self._format_amount(data['period2'], False):>12} "
f"{data['diff_percent']:>+8.1f}% "
f"{status}"
)
lines.append("")
lines.append(sep)
return "\n".join(lines)
+31
View File
@@ -0,0 +1,31 @@
# Paperless Finance Report Tool - Dependencies
# HTTP Client
requests>=2.31.0
# CLI Framework
click>=8.1.7
# Configuration
pyyaml>=6.0.1
# HTML Templating
jinja2>=3.1.2
# PDF Generation
weasyprint>=60.1
# Data Processing
python-dateutil>=2.8.2
# Caching
diskcache>=5.6.3
# Logging (colorized output)
colorlog>=6.8.0
# Progress bars
tqdm>=4.66.1
# Table formatting for CLI
tabulate>=0.9.0
+88
View File
@@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""
Setup-Skript für das Paperless Finance Report Tool.
"""
from setuptools import setup, find_packages
from pathlib import Path
# README einlesen
readme_path = Path(__file__).parent / 'README.md'
long_description = ''
if readme_path.exists():
long_description = readme_path.read_text(encoding='utf-8')
setup(
name='paperless-report',
version='1.0.0',
description='Finanz-Auswertungstool für Paperless-ngx',
long_description=long_description,
long_description_content_type='text/markdown',
author='Your Name',
author_email='your.email@example.com',
url='https://github.com/yourusername/paperless-report',
license='MIT',
py_modules=[
'main',
'config',
'paperless_client',
'extractor',
'report_generator',
],
include_package_data=True,
package_data={
'': ['templates/*.html', 'config.yaml.example'],
},
install_requires=[
'requests>=2.31.0',
'click>=8.1.7',
'pyyaml>=6.0.1',
'jinja2>=3.1.2',
'python-dateutil>=2.8.2',
'tabulate>=0.9.0',
'tqdm>=4.66.1',
],
extras_require={
'full': [
'weasyprint>=60.1',
'diskcache>=5.6.3',
'colorlog>=6.8.0',
],
'dev': [
'pytest>=7.4.0',
'pytest-cov>=4.1.0',
'black>=23.7.0',
'flake8>=6.1.0',
'mypy>=1.5.0',
],
},
entry_points={
'console_scripts': [
'paperless-report=main:main',
],
},
python_requires='>=3.8',
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Console',
'Intended Audience :: End Users/Desktop',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Topic :: Office/Business :: Financial :: Accounting',
],
keywords='paperless paperless-ngx finance report accounting',
)
+848
View File
@@ -0,0 +1,848 @@
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{{ title }}</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.1/dist/chart.umd.min.js"></script>
<style>
:root {
--primary-color: #2E86AB;
--secondary-color: #A23B72;
--success-color: #28a745;
--warning-color: #ffc107;
--danger-color: #dc3545;
--light-bg: #f8f9fa;
--border-color: #dee2e6;
--text-color: #212529;
--text-muted: #6c757d;
}
* {
box-sizing: border-box;
margin: 0;
padding: 0;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif;
line-height: 1.6;
color: var(--text-color);
background-color: #fff;
padding: 20px;
max-width: 1400px;
margin: 0 auto;
}
h1, h2, h3 {
margin-bottom: 1rem;
color: var(--primary-color);
}
h1 {
font-size: 2rem;
border-bottom: 3px solid var(--primary-color);
padding-bottom: 0.5rem;
margin-bottom: 1.5rem;
}
h2 {
font-size: 1.5rem;
margin-top: 2rem;
border-bottom: 1px solid var(--border-color);
padding-bottom: 0.5rem;
}
.header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 2rem;
}
.header-info {
text-align: right;
color: var(--text-muted);
font-size: 0.9rem;
}
.summary-cards {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 1rem;
margin-bottom: 2rem;
}
.card {
background: var(--light-bg);
border-radius: 8px;
padding: 1.5rem;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.card-title {
font-size: 0.85rem;
color: var(--text-muted);
text-transform: uppercase;
letter-spacing: 0.5px;
margin-bottom: 0.5rem;
}
.card-value {
font-size: 1.75rem;
font-weight: bold;
color: var(--primary-color);
}
.card-value.highlight {
color: var(--secondary-color);
}
.card-subtitle {
font-size: 0.8rem;
color: var(--text-muted);
margin-top: 0.25rem;
}
.charts-container {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(400px, 1fr));
gap: 2rem;
margin: 2rem 0;
}
.chart-wrapper {
background: var(--light-bg);
border-radius: 8px;
padding: 1.5rem;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.chart-wrapper h3 {
margin-bottom: 1rem;
font-size: 1.1rem;
}
.chart-container {
position: relative;
height: 300px;
}
table {
width: 100%;
border-collapse: collapse;
margin: 1rem 0;
font-size: 0.9rem;
}
th, td {
padding: 0.75rem;
text-align: left;
border-bottom: 1px solid var(--border-color);
}
th {
background-color: var(--light-bg);
font-weight: 600;
color: var(--text-muted);
text-transform: uppercase;
font-size: 0.8rem;
letter-spacing: 0.5px;
}
tr:hover {
background-color: var(--light-bg);
}
.text-right {
text-align: right;
}
.amount {
font-family: 'SF Mono', Consolas, monospace;
white-space: nowrap;
}
.amount-positive {
color: var(--danger-color);
}
.percentage {
color: var(--text-muted);
font-size: 0.85rem;
}
.progress-bar {
height: 8px;
background-color: #e9ecef;
border-radius: 4px;
overflow: hidden;
margin-top: 0.25rem;
}
.progress-bar-fill {
height: 100%;
background-color: var(--primary-color);
border-radius: 4px;
}
.tag {
display: inline-block;
padding: 0.2rem 0.6rem;
background-color: var(--primary-color);
color: white;
border-radius: 12px;
font-size: 0.75rem;
margin: 0.1rem;
}
.document-link {
color: var(--primary-color);
text-decoration: none;
}
.document-link:hover {
text-decoration: underline;
}
.section {
margin: 2rem 0;
}
.comparison-table {
margin-top: 1rem;
}
.comparison-table .change-positive {
color: var(--danger-color);
}
.comparison-table .change-negative {
color: var(--success-color);
}
.comparison-table .new-item {
background-color: #fff3cd;
}
.comparison-table .removed-item {
background-color: #f8d7da;
}
.export-buttons {
margin: 2rem 0;
display: flex;
gap: 1rem;
}
.btn {
display: inline-block;
padding: 0.5rem 1rem;
background-color: var(--primary-color);
color: white;
text-decoration: none;
border-radius: 4px;
border: none;
cursor: pointer;
font-size: 0.9rem;
}
.btn:hover {
opacity: 0.9;
}
.btn-secondary {
background-color: var(--text-muted);
}
.footer {
margin-top: 3rem;
padding-top: 1rem;
border-top: 1px solid var(--border-color);
text-align: center;
color: var(--text-muted);
font-size: 0.85rem;
}
@media print {
body {
padding: 0;
}
.export-buttons {
display: none;
}
.chart-wrapper {
page-break-inside: avoid;
}
table {
page-break-inside: avoid;
}
}
@media (max-width: 768px) {
.charts-container {
grid-template-columns: 1fr;
}
.summary-cards {
grid-template-columns: 1fr 1fr;
}
}
</style>
</head>
<body>
<div class="header">
<h1>{{ title }}</h1>
<div class="header-info">
{% if year %}
<div><strong>Zeitraum:</strong>
{% if month %}
{{ '%02d'|format(month) }}/{{ year }}
{% else %}
{{ year }}
{% endif %}
</div>
{% endif %}
<div><strong>Generiert:</strong> {{ generated_at|format_date('%d.%m.%Y %H:%M') }}</div>
</div>
</div>
<!-- Zusammenfassung -->
<div class="summary-cards">
<div class="card">
<div class="card-title">Gesamtsumme</div>
<div class="card-value highlight">{{ result.total_amount|format_amount }}</div>
<div class="card-subtitle">{{ result.documents_with_amount }} Dokumente mit Betrag</div>
</div>
<div class="card">
<div class="card-title">Dokumente</div>
<div class="card-value">{{ result.document_count }}</div>
<div class="card-subtitle">{{ result.documents_without_amount }} ohne Betrag</div>
</div>
<div class="card">
<div class="card-title">Durchschnitt</div>
<div class="card-value">{{ result.average_amount|format_amount }}</div>
<div class="card-subtitle">pro Dokument</div>
</div>
<div class="card">
<div class="card-title">Median</div>
<div class="card-value">{{ result.median_amount|format_amount }}</div>
<div class="card-subtitle">Min: {{ result.min_amount|format_amount }}</div>
</div>
</div>
<!-- Diagramme -->
<div class="charts-container">
{% if result.by_tag %}
<div class="chart-wrapper">
<h3>Verteilung nach Tag</h3>
<div class="chart-container">
<canvas id="tagChart"></canvas>
</div>
</div>
{% endif %}
{% if result.by_category %}
<div class="chart-wrapper">
<h3>Verteilung nach Kategorie</h3>
<div class="chart-container">
<canvas id="categoryChart"></canvas>
</div>
</div>
{% endif %}
{% if result.by_month %}
<div class="chart-wrapper">
<h3>Monatsverlauf</h3>
<div class="chart-container">
<canvas id="monthChart"></canvas>
</div>
</div>
{% endif %}
{% if result.by_correspondent %}
<div class="chart-wrapper">
<h3>Top 10 Korrespondenten</h3>
<div class="chart-container">
<canvas id="correspondentChart"></canvas>
</div>
</div>
{% endif %}
</div>
<!-- Nach Tag -->
{% if result.by_tag %}
<div class="section">
<h2>Nach Tag</h2>
<table>
<thead>
<tr>
<th>Tag</th>
<th class="text-right">Betrag</th>
<th class="text-right">Anteil</th>
<th class="text-right">Anzahl</th>
<th style="width: 200px;">Verteilung</th>
</tr>
</thead>
<tbody>
{% for name, stats in result.by_tag.items() %}
<tr>
<td><span class="tag">{{ name }}</span></td>
<td class="text-right amount">{{ stats.amount|format_amount }}</td>
<td class="text-right percentage">{{ stats.percentage|format_percent }}</td>
<td class="text-right">{{ stats.count }}</td>
<td>
<div class="progress-bar">
<div class="progress-bar-fill" style="width: {{ stats.percentage }}%"></div>
</div>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
<!-- Nach Kategorie -->
{% if result.by_category %}
<div class="section">
<h2>Nach Kategorie</h2>
<table>
<thead>
<tr>
<th>Kategorie</th>
<th class="text-right">Betrag</th>
<th class="text-right">Anteil</th>
<th class="text-right">Anzahl</th>
<th style="width: 200px;">Verteilung</th>
</tr>
</thead>
<tbody>
{% for name, stats in result.by_category.items() %}
<tr>
<td>{{ name }}</td>
<td class="text-right amount">{{ stats.amount|format_amount }}</td>
<td class="text-right percentage">{{ stats.percentage|format_percent }}</td>
<td class="text-right">{{ stats.count }}</td>
<td>
<div class="progress-bar">
<div class="progress-bar-fill" style="width: {{ stats.percentage }}%"></div>
</div>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
<!-- Nach Monat -->
{% if result.by_month %}
<div class="section">
<h2>Nach Monat</h2>
<table>
<thead>
<tr>
<th>Monat</th>
<th class="text-right">Betrag</th>
<th class="text-right">Anzahl</th>
</tr>
</thead>
<tbody>
{% for month, stats in result.by_month.items() %}
<tr>
<td>{{ month }}</td>
<td class="text-right amount">{{ stats.amount|format_amount }}</td>
<td class="text-right">{{ stats.count }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
<!-- Nach Korrespondent -->
{% if result.by_correspondent %}
<div class="section">
<h2>Nach Korrespondent</h2>
<table>
<thead>
<tr>
<th>Korrespondent</th>
<th class="text-right">Betrag</th>
<th class="text-right">Anteil</th>
<th class="text-right">Anzahl</th>
</tr>
</thead>
<tbody>
{% for name, stats in result.by_correspondent.items() %}
<tr>
<td>{{ name }}</td>
<td class="text-right amount">{{ stats.amount|format_amount }}</td>
<td class="text-right percentage">{{ stats.percentage|format_percent }}</td>
<td class="text-right">{{ stats.count }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
<!-- Top Einzelposten -->
{% if result.top_items %}
<div class="section">
<h2>Top 10 Einzelposten</h2>
<table>
<thead>
<tr>
<th>#</th>
<th>Titel</th>
<th>Datum</th>
<th>Korrespondent</th>
<th class="text-right">Betrag</th>
</tr>
</thead>
<tbody>
{% for doc in result.top_items[:10] %}
<tr>
<td>{{ loop.index }}</td>
<td>
{% if doc.web_url %}
<a href="{{ doc.web_url }}" class="document-link" target="_blank">{{ doc.title }}</a>
{% else %}
{{ doc.title }}
{% endif %}
</td>
<td>{{ doc.effective_date|format_date }}</td>
<td>{{ doc.correspondent or '-' }}</td>
<td class="text-right amount amount-positive">{{ doc.betrag|format_amount }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
<!-- Vergleich -->
{% if comparison %}
<div class="section">
<h2>Periodenvergleich: {{ comparison.period1.name }} vs {{ comparison.period2.name }}</h2>
<div class="summary-cards">
<div class="card">
<div class="card-title">{{ comparison.period1.name }}</div>
<div class="card-value">{{ comparison.period1.total|format_amount }}</div>
<div class="card-subtitle">{{ comparison.period1.count }} Dokumente</div>
</div>
<div class="card">
<div class="card-title">{{ comparison.period2.name }}</div>
<div class="card-value">{{ comparison.period2.total|format_amount }}</div>
<div class="card-subtitle">{{ comparison.period2.count }} Dokumente</div>
</div>
<div class="card">
<div class="card-title">Veränderung</div>
<div class="card-value {% if comparison.diff_percent > 0 %}change-positive{% else %}change-negative{% endif %}">
{{ '%+.1f'|format(comparison.diff_percent) }}%
</div>
<div class="card-subtitle">{{ comparison.diff_absolute|format_amount }}</div>
</div>
</div>
<table class="comparison-table">
<thead>
<tr>
<th>Kategorie</th>
<th class="text-right">{{ comparison.period1.name }}</th>
<th class="text-right">{{ comparison.period2.name }}</th>
<th class="text-right">Differenz</th>
<th class="text-right">Veränderung</th>
</tr>
</thead>
<tbody>
{% for cat, data in comparison.category_comparison.items() %}
<tr class="{% if data.status == 'new' %}new-item{% elif data.status == 'removed' %}removed-item{% endif %}">
<td>{{ cat }}</td>
<td class="text-right amount">{{ data.period1|format_amount }}</td>
<td class="text-right amount">{{ data.period2|format_amount }}</td>
<td class="text-right amount">{{ data.diff_absolute|format_amount }}</td>
<td class="text-right {% if data.diff_percent > 0 %}change-positive{% else %}change-negative{% endif %}">
{{ '%+.1f'|format(data.diff_percent) }}%
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
<!-- Alle Dokumente -->
<div class="section">
<h2>Alle Dokumente ({{ result.document_count }})</h2>
<table id="documentsTable">
<thead>
<tr>
<th>Datum</th>
<th>Titel</th>
<th>Korrespondent</th>
<th>Tags</th>
<th class="text-right">Betrag</th>
</tr>
</thead>
<tbody>
{% for doc in result.documents %}
<tr>
<td>{{ doc.effective_date|format_date }}</td>
<td>
{% if doc.web_url %}
<a href="{{ doc.web_url }}" class="document-link" target="_blank">{{ doc.title }}</a>
{% else %}
{{ doc.title }}
{% endif %}
</td>
<td>{{ doc.correspondent or '-' }}</td>
<td>
{% for tag in doc.tags %}
<span class="tag">{{ tag }}</span>
{% endfor %}
</td>
<td class="text-right amount">{{ doc.betrag|format_amount if doc.betrag else '-' }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
<!-- Export-Buttons -->
<div class="export-buttons">
<button class="btn" onclick="window.print()">Drucken / PDF</button>
<button class="btn btn-secondary" onclick="exportTableToCSV()">CSV exportieren</button>
</div>
<div class="footer">
<p>Paperless Finance Report &bull; Generiert am {{ generated_at|format_date('%d.%m.%Y um %H:%M') }}</p>
</div>
<script>
// Chart-Daten von Jinja
const tagChartData = {{ tag_chart_data|safe }};
const categoryChartData = {{ category_chart_data|safe }};
const monthChartData = {{ month_chart_data|safe }};
const correspondentChartData = {{ correspondent_chart_data|safe }};
const currency = "{{ currency }}";
// Formatierung für Tooltips
function formatAmount(value) {
return currency + " " + value.toLocaleString('de-CH', {
minimumFractionDigits: 2,
maximumFractionDigits: 2
});
}
// Tag-Chart (Doughnut)
if (tagChartData.labels && tagChartData.labels.length > 0) {
const tagCtx = document.getElementById('tagChart');
if (tagCtx) {
new Chart(tagCtx, {
type: 'doughnut',
data: {
labels: tagChartData.labels,
datasets: [{
data: tagChartData.values,
backgroundColor: tagChartData.colors,
borderWidth: 2,
borderColor: '#fff'
}]
},
options: {
responsive: true,
maintainAspectRatio: false,
plugins: {
legend: {
position: 'right',
labels: {
boxWidth: 12,
padding: 10
}
},
tooltip: {
callbacks: {
label: function(context) {
return context.label + ': ' + formatAmount(context.raw);
}
}
}
}
}
});
}
}
// Kategorie-Chart (Bar)
if (categoryChartData.labels && categoryChartData.labels.length > 0) {
const catCtx = document.getElementById('categoryChart');
if (catCtx) {
new Chart(catCtx, {
type: 'bar',
data: {
labels: categoryChartData.labels,
datasets: [{
data: categoryChartData.values,
backgroundColor: categoryChartData.colors,
borderWidth: 0
}]
},
options: {
responsive: true,
maintainAspectRatio: false,
indexAxis: 'y',
plugins: {
legend: {
display: false
},
tooltip: {
callbacks: {
label: function(context) {
return formatAmount(context.raw);
}
}
}
},
scales: {
x: {
ticks: {
callback: function(value) {
return formatAmount(value);
}
}
}
}
}
});
}
}
// Monats-Chart (Line)
if (monthChartData.labels && monthChartData.labels.length > 0) {
const monthCtx = document.getElementById('monthChart');
if (monthCtx) {
new Chart(monthCtx, {
type: 'line',
data: {
labels: monthChartData.labels,
datasets: [{
data: monthChartData.values,
borderColor: '#2E86AB',
backgroundColor: 'rgba(46, 134, 171, 0.1)',
fill: true,
tension: 0.3,
pointRadius: 4,
pointHoverRadius: 6
}]
},
options: {
responsive: true,
maintainAspectRatio: false,
plugins: {
legend: {
display: false
},
tooltip: {
callbacks: {
label: function(context) {
return formatAmount(context.raw);
}
}
}
},
scales: {
y: {
beginAtZero: true,
ticks: {
callback: function(value) {
return formatAmount(value);
}
}
}
}
}
});
}
}
// Korrespondent-Chart (Bar horizontal)
if (correspondentChartData.labels && correspondentChartData.labels.length > 0) {
const corrCtx = document.getElementById('correspondentChart');
if (corrCtx) {
new Chart(corrCtx, {
type: 'bar',
data: {
labels: correspondentChartData.labels,
datasets: [{
data: correspondentChartData.values,
backgroundColor: '#A23B72',
borderWidth: 0
}]
},
options: {
responsive: true,
maintainAspectRatio: false,
indexAxis: 'y',
plugins: {
legend: {
display: false
},
tooltip: {
callbacks: {
label: function(context) {
return formatAmount(context.raw);
}
}
}
},
scales: {
x: {
ticks: {
callback: function(value) {
return formatAmount(value);
}
}
}
}
}
});
}
}
// CSV Export
function exportTableToCSV() {
const table = document.getElementById('documentsTable');
const rows = table.querySelectorAll('tr');
let csv = [];
rows.forEach(row => {
const cells = row.querySelectorAll('th, td');
const rowData = [];
cells.forEach(cell => {
let text = cell.innerText.replace(/"/g, '""');
if (text.includes(';') || text.includes('"') || text.includes('\n')) {
text = '"' + text + '"';
}
rowData.push(text);
});
csv.push(rowData.join(';'));
});
const csvContent = '\ufeff' + csv.join('\n'); // BOM for Excel
const blob = new Blob([csvContent], { type: 'text/csv;charset=utf-8;' });
const link = document.createElement('a');
link.href = URL.createObjectURL(blob);
link.download = 'finanzbericht_export.csv';
link.click();
}
</script>
</body>
</html>