d2dd837f26
A Python CLI tool for generating financial reports from Paperless-ngx: - Phase 1 (MVP): Config handling, Paperless API client with auth and pagination, custom fields extraction, tag-based summation, CLI output - Phase 2 (Grouping): Multiple grouping criteria (tag, correspondent, category, payment type, month, quarter, year), percentage distribution - Phase 3 (Reports): HTML reports with Chart.js diagrams (doughnut, bar, line charts), PDF export via WeasyPrint, JSON and CSV export - Phase 4 (Comfort): Automatic tag ID resolution, disk caching with diskcache, colorized logging, comprehensive error handling Features: - Flexible date filtering (year, month, date range) - Period comparison with change analysis - Swiss franc formatting (CHF with apostrophe separators) - Interactive HTML reports with sortable tables and document links - Multiple output formats (CLI, HTML, PDF, JSON, CSV)
593 lines
18 KiB
Python
593 lines
18 KiB
Python
"""
|
|
Daten-Extraktion und Aggregation für das Paperless Finance Report Tool.
|
|
|
|
Extrahiert Custom Fields aus Dokumenten und aggregiert die Daten
|
|
für verschiedene Gruppierungen.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from decimal import Decimal, InvalidOperation
|
|
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
|
|
|
|
from dateutil.parser import parse as parse_date
|
|
|
|
from config import Config, get_config
|
|
from paperless_client import PaperlessClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class FinanceDocument:
|
|
"""Ein aufbereitetes Finanzdokument."""
|
|
|
|
id: int
|
|
title: str
|
|
archive_date: Optional[datetime] = None
|
|
created: Optional[datetime] = None
|
|
added: Optional[datetime] = None
|
|
|
|
# Paperless Metadata
|
|
correspondent: Optional[str] = None
|
|
correspondent_id: Optional[int] = None
|
|
document_type: Optional[str] = None
|
|
tags: List[str] = field(default_factory=list)
|
|
tag_ids: List[int] = field(default_factory=list)
|
|
|
|
# Custom Fields
|
|
betrag: Optional[Decimal] = None
|
|
rechnungsdatum: Optional[datetime] = None
|
|
kategorie: Optional[str] = None
|
|
zahlungsart: Optional[str] = None
|
|
periode: Optional[str] = None
|
|
notiz: Optional[str] = None
|
|
|
|
# URLs
|
|
web_url: Optional[str] = None
|
|
|
|
# Original-Daten
|
|
raw_data: Dict = field(default_factory=dict)
|
|
|
|
@property
|
|
def effective_date(self) -> Optional[datetime]:
|
|
"""Das effektive Datum (Rechnungsdatum oder Archivdatum)."""
|
|
return self.rechnungsdatum or self.archive_date
|
|
|
|
@property
|
|
def year(self) -> Optional[int]:
|
|
"""Jahr des effektiven Datums."""
|
|
date = self.effective_date
|
|
return date.year if date else None
|
|
|
|
@property
|
|
def month(self) -> Optional[int]:
|
|
"""Monat des effektiven Datums."""
|
|
date = self.effective_date
|
|
return date.month if date else None
|
|
|
|
@property
|
|
def month_year(self) -> Optional[str]:
|
|
"""Monat/Jahr als String (z.B. '2024-01')."""
|
|
date = self.effective_date
|
|
return date.strftime('%Y-%m') if date else None
|
|
|
|
@property
|
|
def quarter(self) -> Optional[str]:
|
|
"""Quartal als String (z.B. 'Q1 2024')."""
|
|
date = self.effective_date
|
|
if not date:
|
|
return None
|
|
q = (date.month - 1) // 3 + 1
|
|
return f"Q{q} {date.year}"
|
|
|
|
|
|
class DocumentExtractor:
|
|
"""Extrahiert und verarbeitet Dokumente aus Paperless."""
|
|
|
|
def __init__(self, client: PaperlessClient, config: Optional[Config] = None):
|
|
"""
|
|
Initialisiert den Extractor.
|
|
|
|
Args:
|
|
client: Paperless API Client
|
|
config: Konfiguration
|
|
"""
|
|
self.client = client
|
|
self.config = config or get_config()
|
|
self._custom_fields_map: Dict[str, int] = {}
|
|
|
|
def _build_custom_fields_map(self) -> None:
|
|
"""Baut ein Mapping von Feldnamen zu IDs."""
|
|
if self._custom_fields_map:
|
|
return
|
|
|
|
fields = self.client.get_custom_fields()
|
|
for field_id, field_def in fields.items():
|
|
name = field_def['name'].lower()
|
|
self._custom_fields_map[name] = field_id
|
|
|
|
def _parse_decimal(self, value: Any) -> Optional[Decimal]:
|
|
"""
|
|
Parst einen Wert zu Decimal.
|
|
|
|
Verarbeitet verschiedene Formate:
|
|
- 1234.56
|
|
- 1234,56
|
|
- 1'234.56 (Schweizer Format)
|
|
- CHF 1234.56
|
|
"""
|
|
if value is None:
|
|
return None
|
|
|
|
if isinstance(value, (int, float)):
|
|
return Decimal(str(value))
|
|
|
|
if isinstance(value, Decimal):
|
|
return value
|
|
|
|
if not isinstance(value, str):
|
|
return None
|
|
|
|
# String bereinigen
|
|
value = value.strip()
|
|
|
|
# Währungssymbole entfernen
|
|
value = re.sub(r'^(CHF|EUR|USD|Fr\.?)\s*', '', value, flags=re.IGNORECASE)
|
|
value = re.sub(r'\s*(CHF|EUR|USD|Fr\.?)$', '', value, flags=re.IGNORECASE)
|
|
|
|
# Tausender-Trennzeichen entfernen (Apostroph, Punkt als Tausender)
|
|
# Schweizer Format: 1'234.56 oder 1'234,56
|
|
if "'" in value:
|
|
value = value.replace("'", "")
|
|
|
|
# Deutsches/Schweizer Format mit Punkt als Tausender: 1.234,56
|
|
if re.match(r'^\d{1,3}(\.\d{3})+,\d{2}$', value):
|
|
value = value.replace(".", "").replace(",", ".")
|
|
# Komma als Dezimaltrennzeichen ohne Tausender
|
|
elif "," in value and "." not in value:
|
|
value = value.replace(",", ".")
|
|
|
|
try:
|
|
return Decimal(value)
|
|
except InvalidOperation:
|
|
logger.warning(f"Konnte Betrag nicht parsen: {value}")
|
|
return None
|
|
|
|
def _parse_date(self, value: Any) -> Optional[datetime]:
|
|
"""Parst einen Wert zu datetime."""
|
|
if value is None:
|
|
return None
|
|
|
|
if isinstance(value, datetime):
|
|
return value
|
|
|
|
if not isinstance(value, str):
|
|
return None
|
|
|
|
try:
|
|
return parse_date(value)
|
|
except (ValueError, TypeError):
|
|
logger.warning(f"Konnte Datum nicht parsen: {value}")
|
|
return None
|
|
|
|
def _get_custom_field_value(self, doc: dict, field_name: str) -> Any:
|
|
"""Holt den Wert eines Custom Fields aus einem Dokument."""
|
|
# Aus resolved fields
|
|
resolved = doc.get('custom_fields_resolved', {})
|
|
if field_name in resolved:
|
|
return resolved[field_name].get('value')
|
|
|
|
# Aus rohen custom_fields
|
|
self._build_custom_fields_map()
|
|
field_name_lower = field_name.lower()
|
|
|
|
for cf in doc.get('custom_fields', []):
|
|
field_id = cf.get('field')
|
|
# Prüfen ob ID zum gesuchten Feldnamen passt
|
|
for name, fid in self._custom_fields_map.items():
|
|
if fid == field_id and name == field_name_lower:
|
|
return cf.get('value')
|
|
|
|
return None
|
|
|
|
def extract_document(self, raw_doc: dict) -> FinanceDocument:
|
|
"""
|
|
Extrahiert ein aufbereitetes FinanceDocument aus den Rohdaten.
|
|
|
|
Args:
|
|
raw_doc: Rohes Dokument-Dictionary von der API
|
|
|
|
Returns:
|
|
FinanceDocument-Instanz
|
|
"""
|
|
# Custom Field Namen aus Config
|
|
cf_names = self.config.custom_field_names
|
|
|
|
# Basis-Daten
|
|
doc = FinanceDocument(
|
|
id=raw_doc['id'],
|
|
title=raw_doc.get('title', ''),
|
|
raw_data=raw_doc
|
|
)
|
|
|
|
# Datums-Felder
|
|
doc.archive_date = self._parse_date(raw_doc.get('archive_date'))
|
|
doc.created = self._parse_date(raw_doc.get('created'))
|
|
doc.added = self._parse_date(raw_doc.get('added'))
|
|
|
|
# Korrespondent
|
|
doc.correspondent_id = raw_doc.get('correspondent')
|
|
doc.correspondent = raw_doc.get('correspondent_name', '')
|
|
|
|
# Dokumenttyp
|
|
doc.document_type = raw_doc.get('document_type_name', '')
|
|
|
|
# Tags
|
|
doc.tag_ids = raw_doc.get('tags', [])
|
|
doc.tags = raw_doc.get('tag_names', [])
|
|
|
|
# URL
|
|
doc.web_url = raw_doc.get('web_url', '')
|
|
|
|
# Custom Fields
|
|
betrag_name = cf_names.get('betrag', 'betrag')
|
|
doc.betrag = self._parse_decimal(
|
|
self._get_custom_field_value(raw_doc, betrag_name)
|
|
)
|
|
|
|
datum_name = cf_names.get('rechnungsdatum', 'rechnungsdatum')
|
|
doc.rechnungsdatum = self._parse_date(
|
|
self._get_custom_field_value(raw_doc, datum_name)
|
|
)
|
|
|
|
kat_name = cf_names.get('kategorie', 'kategorie')
|
|
doc.kategorie = self._get_custom_field_value(raw_doc, kat_name)
|
|
|
|
zahl_name = cf_names.get('zahlungsart', 'zahlungsart')
|
|
doc.zahlungsart = self._get_custom_field_value(raw_doc, zahl_name)
|
|
|
|
periode_name = cf_names.get('periode', 'periode')
|
|
doc.periode = self._get_custom_field_value(raw_doc, periode_name)
|
|
|
|
notiz_name = cf_names.get('notiz', 'notiz')
|
|
doc.notiz = self._get_custom_field_value(raw_doc, notiz_name)
|
|
|
|
return doc
|
|
|
|
def extract_documents(self, raw_docs: List[dict]) -> List[FinanceDocument]:
|
|
"""
|
|
Extrahiert mehrere Dokumente.
|
|
|
|
Args:
|
|
raw_docs: Liste von Roh-Dokumenten
|
|
|
|
Returns:
|
|
Liste von FinanceDocument-Instanzen
|
|
"""
|
|
# Metadaten auflösen
|
|
resolved = self.client.resolve_all_metadata(raw_docs)
|
|
|
|
return [self.extract_document(doc) for doc in resolved]
|
|
|
|
|
|
@dataclass
|
|
class AggregationResult:
|
|
"""Ergebnis einer Aggregation."""
|
|
|
|
# Basis-Statistiken
|
|
total_amount: Decimal = Decimal('0')
|
|
document_count: int = 0
|
|
documents_with_amount: int = 0
|
|
documents_without_amount: int = 0
|
|
|
|
# Dokumente
|
|
documents: List[FinanceDocument] = field(default_factory=list)
|
|
|
|
# Gruppierte Daten
|
|
by_tag: Dict[str, 'GroupStats'] = field(default_factory=dict)
|
|
by_correspondent: Dict[str, 'GroupStats'] = field(default_factory=dict)
|
|
by_category: Dict[str, 'GroupStats'] = field(default_factory=dict)
|
|
by_payment_type: Dict[str, 'GroupStats'] = field(default_factory=dict)
|
|
by_month: Dict[str, 'GroupStats'] = field(default_factory=dict)
|
|
by_quarter: Dict[str, 'GroupStats'] = field(default_factory=dict)
|
|
by_year: Dict[int, 'GroupStats'] = field(default_factory=dict)
|
|
|
|
# Zusätzliche Statistiken
|
|
average_amount: Decimal = Decimal('0')
|
|
median_amount: Decimal = Decimal('0')
|
|
min_amount: Decimal = Decimal('0')
|
|
max_amount: Decimal = Decimal('0')
|
|
top_items: List[FinanceDocument] = field(default_factory=list)
|
|
|
|
@property
|
|
def total_formatted(self) -> str:
|
|
"""Formatierte Gesamtsumme."""
|
|
return f"{self.total_amount:,.2f}".replace(',', "'")
|
|
|
|
|
|
@dataclass
|
|
class GroupStats:
|
|
"""Statistiken für eine Gruppe."""
|
|
|
|
name: str
|
|
amount: Decimal = Decimal('0')
|
|
count: int = 0
|
|
percentage: float = 0.0
|
|
documents: List[FinanceDocument] = field(default_factory=list)
|
|
|
|
@property
|
|
def amount_formatted(self) -> str:
|
|
"""Formatierter Betrag."""
|
|
return f"{self.amount:,.2f}".replace(',', "'")
|
|
|
|
|
|
class DataAggregator:
|
|
"""Aggregiert Finanzdokumente nach verschiedenen Kriterien."""
|
|
|
|
def __init__(self, config: Optional[Config] = None):
|
|
"""
|
|
Initialisiert den Aggregator.
|
|
|
|
Args:
|
|
config: Konfiguration
|
|
"""
|
|
self.config = config or get_config()
|
|
|
|
def aggregate(
|
|
self,
|
|
documents: List[FinanceDocument],
|
|
group_by: Optional[List[str]] = None
|
|
) -> AggregationResult:
|
|
"""
|
|
Aggregiert Dokumente.
|
|
|
|
Args:
|
|
documents: Liste von Dokumenten
|
|
group_by: Liste von Gruppierungskriterien:
|
|
'tag', 'correspondent', 'category', 'payment_type',
|
|
'month', 'quarter', 'year'
|
|
|
|
Returns:
|
|
AggregationResult mit allen Statistiken
|
|
"""
|
|
result = AggregationResult()
|
|
result.documents = documents
|
|
result.document_count = len(documents)
|
|
|
|
# Beträge sammeln
|
|
amounts: List[Decimal] = []
|
|
|
|
for doc in documents:
|
|
if doc.betrag is not None:
|
|
result.total_amount += doc.betrag
|
|
result.documents_with_amount += 1
|
|
amounts.append(doc.betrag)
|
|
else:
|
|
result.documents_without_amount += 1
|
|
|
|
# Basis-Statistiken
|
|
if amounts:
|
|
amounts_sorted = sorted(amounts)
|
|
result.min_amount = amounts_sorted[0]
|
|
result.max_amount = amounts_sorted[-1]
|
|
result.average_amount = result.total_amount / len(amounts)
|
|
|
|
# Median
|
|
mid = len(amounts_sorted) // 2
|
|
if len(amounts_sorted) % 2 == 0:
|
|
result.median_amount = (amounts_sorted[mid - 1] + amounts_sorted[mid]) / 2
|
|
else:
|
|
result.median_amount = amounts_sorted[mid]
|
|
|
|
# Top-Posten
|
|
docs_with_amount = [d for d in documents if d.betrag is not None]
|
|
result.top_items = sorted(
|
|
docs_with_amount,
|
|
key=lambda d: d.betrag or Decimal('0'),
|
|
reverse=True
|
|
)[:10]
|
|
|
|
# Gruppierungen
|
|
group_by = group_by or ['tag', 'correspondent', 'category', 'month']
|
|
|
|
if 'tag' in group_by:
|
|
result.by_tag = self._group_by_tags(documents, result.total_amount)
|
|
|
|
if 'correspondent' in group_by:
|
|
result.by_correspondent = self._group_by_field(
|
|
documents, 'correspondent', result.total_amount
|
|
)
|
|
|
|
if 'category' in group_by:
|
|
result.by_category = self._group_by_field(
|
|
documents, 'kategorie', result.total_amount
|
|
)
|
|
|
|
if 'payment_type' in group_by:
|
|
result.by_payment_type = self._group_by_field(
|
|
documents, 'zahlungsart', result.total_amount
|
|
)
|
|
|
|
if 'month' in group_by:
|
|
result.by_month = self._group_by_field(
|
|
documents, 'month_year', result.total_amount
|
|
)
|
|
|
|
if 'quarter' in group_by:
|
|
result.by_quarter = self._group_by_field(
|
|
documents, 'quarter', result.total_amount
|
|
)
|
|
|
|
if 'year' in group_by:
|
|
result.by_year = self._group_by_field(
|
|
documents, 'year', result.total_amount
|
|
)
|
|
|
|
return result
|
|
|
|
def _group_by_tags(
|
|
self,
|
|
documents: List[FinanceDocument],
|
|
total: Decimal
|
|
) -> Dict[str, GroupStats]:
|
|
"""Gruppiert nach Tags (ein Dokument kann mehrere Tags haben)."""
|
|
groups: Dict[str, GroupStats] = {}
|
|
|
|
for doc in documents:
|
|
if not doc.tags:
|
|
tag_name = 'Ohne Tag'
|
|
if tag_name not in groups:
|
|
groups[tag_name] = GroupStats(name=tag_name)
|
|
groups[tag_name].count += 1
|
|
if doc.betrag:
|
|
groups[tag_name].amount += doc.betrag
|
|
groups[tag_name].documents.append(doc)
|
|
else:
|
|
for tag in doc.tags:
|
|
if tag not in groups:
|
|
groups[tag] = GroupStats(name=tag)
|
|
groups[tag].count += 1
|
|
if doc.betrag:
|
|
groups[tag].amount += doc.betrag
|
|
groups[tag].documents.append(doc)
|
|
|
|
# Prozente berechnen
|
|
if total > 0:
|
|
for stats in groups.values():
|
|
stats.percentage = float(stats.amount / total * 100)
|
|
|
|
# Nach Betrag sortieren
|
|
return dict(sorted(
|
|
groups.items(),
|
|
key=lambda x: x[1].amount,
|
|
reverse=True
|
|
))
|
|
|
|
def _group_by_field(
|
|
self,
|
|
documents: List[FinanceDocument],
|
|
field: str,
|
|
total: Decimal
|
|
) -> Dict[str, GroupStats]:
|
|
"""Gruppiert nach einem einzelnen Feld."""
|
|
groups: Dict[str, GroupStats] = {}
|
|
|
|
for doc in documents:
|
|
value = getattr(doc, field, None)
|
|
|
|
if value is None or value == '':
|
|
key = 'Nicht zugeordnet'
|
|
else:
|
|
key = str(value)
|
|
|
|
if key not in groups:
|
|
groups[key] = GroupStats(name=key)
|
|
|
|
groups[key].count += 1
|
|
if doc.betrag:
|
|
groups[key].amount += doc.betrag
|
|
groups[key].documents.append(doc)
|
|
|
|
# Prozente berechnen
|
|
if total > 0:
|
|
for stats in groups.values():
|
|
stats.percentage = float(stats.amount / total * 100)
|
|
|
|
# Nach Betrag sortieren (bei Monaten chronologisch)
|
|
if field in ('month_year', 'quarter'):
|
|
return dict(sorted(groups.items()))
|
|
else:
|
|
return dict(sorted(
|
|
groups.items(),
|
|
key=lambda x: x[1].amount,
|
|
reverse=True
|
|
))
|
|
|
|
def compare_periods(
|
|
self,
|
|
documents: List[FinanceDocument],
|
|
period1: Union[int, str],
|
|
period2: Union[int, str],
|
|
period_type: str = 'year'
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Vergleicht zwei Zeiträume.
|
|
|
|
Args:
|
|
documents: Alle Dokumente
|
|
period1: Erste Periode (z.B. 2023)
|
|
period2: Zweite Periode (z.B. 2024)
|
|
period_type: 'year', 'quarter', 'month'
|
|
|
|
Returns:
|
|
Vergleichsergebnis
|
|
"""
|
|
# Dokumente nach Periode filtern
|
|
def get_period(doc: FinanceDocument) -> Optional[Union[int, str]]:
|
|
if period_type == 'year':
|
|
return doc.year
|
|
elif period_type == 'quarter':
|
|
return doc.quarter
|
|
elif period_type == 'month':
|
|
return doc.month_year
|
|
return None
|
|
|
|
docs1 = [d for d in documents if get_period(d) == period1]
|
|
docs2 = [d for d in documents if get_period(d) == period2]
|
|
|
|
agg1 = self.aggregate(docs1, ['tag', 'category'])
|
|
agg2 = self.aggregate(docs2, ['tag', 'category'])
|
|
|
|
# Differenzen berechnen
|
|
diff_absolute = agg2.total_amount - agg1.total_amount
|
|
diff_percent = (
|
|
float(diff_absolute / agg1.total_amount * 100)
|
|
if agg1.total_amount > 0 else 0
|
|
)
|
|
|
|
# Kategorien vergleichen
|
|
category_comparison = {}
|
|
all_categories = set(agg1.by_category.keys()) | set(agg2.by_category.keys())
|
|
|
|
for cat in all_categories:
|
|
stats1 = agg1.by_category.get(cat, GroupStats(name=cat))
|
|
stats2 = agg2.by_category.get(cat, GroupStats(name=cat))
|
|
|
|
diff = stats2.amount - stats1.amount
|
|
pct_change = (
|
|
float(diff / stats1.amount * 100)
|
|
if stats1.amount > 0 else (100.0 if stats2.amount > 0 else 0)
|
|
)
|
|
|
|
category_comparison[cat] = {
|
|
'period1': stats1.amount,
|
|
'period2': stats2.amount,
|
|
'diff_absolute': diff,
|
|
'diff_percent': pct_change,
|
|
'status': 'new' if stats1.amount == 0 else (
|
|
'removed' if stats2.amount == 0 else 'changed'
|
|
)
|
|
}
|
|
|
|
return {
|
|
'period1': {
|
|
'name': str(period1),
|
|
'total': agg1.total_amount,
|
|
'count': agg1.document_count,
|
|
'aggregation': agg1,
|
|
},
|
|
'period2': {
|
|
'name': str(period2),
|
|
'total': agg2.total_amount,
|
|
'count': agg2.document_count,
|
|
'aggregation': agg2,
|
|
},
|
|
'diff_absolute': diff_absolute,
|
|
'diff_percent': diff_percent,
|
|
'category_comparison': category_comparison,
|
|
}
|