Files
Ai/paperless-report/extractor.py
T
Claude d2dd837f26 Add Paperless Finance Report Tool - Complete implementation
A Python CLI tool for generating financial reports from Paperless-ngx:

- Phase 1 (MVP): Config handling, Paperless API client with auth and
  pagination, custom fields extraction, tag-based summation, CLI output
- Phase 2 (Grouping): Multiple grouping criteria (tag, correspondent,
  category, payment type, month, quarter, year), percentage distribution
- Phase 3 (Reports): HTML reports with Chart.js diagrams (doughnut, bar,
  line charts), PDF export via WeasyPrint, JSON and CSV export
- Phase 4 (Comfort): Automatic tag ID resolution, disk caching with
  diskcache, colorized logging, comprehensive error handling

Features:
- Flexible date filtering (year, month, date range)
- Period comparison with change analysis
- Swiss franc formatting (CHF with apostrophe separators)
- Interactive HTML reports with sortable tables and document links
- Multiple output formats (CLI, HTML, PDF, JSON, CSV)
2025-12-07 10:09:10 +00:00

593 lines
18 KiB
Python

"""
Daten-Extraktion und Aggregation für das Paperless Finance Report Tool.
Extrahiert Custom Fields aus Dokumenten und aggregiert die Daten
für verschiedene Gruppierungen.
"""
import logging
import re
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal, InvalidOperation
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from dateutil.parser import parse as parse_date
from config import Config, get_config
from paperless_client import PaperlessClient
logger = logging.getLogger(__name__)
@dataclass
class FinanceDocument:
"""Ein aufbereitetes Finanzdokument."""
id: int
title: str
archive_date: Optional[datetime] = None
created: Optional[datetime] = None
added: Optional[datetime] = None
# Paperless Metadata
correspondent: Optional[str] = None
correspondent_id: Optional[int] = None
document_type: Optional[str] = None
tags: List[str] = field(default_factory=list)
tag_ids: List[int] = field(default_factory=list)
# Custom Fields
betrag: Optional[Decimal] = None
rechnungsdatum: Optional[datetime] = None
kategorie: Optional[str] = None
zahlungsart: Optional[str] = None
periode: Optional[str] = None
notiz: Optional[str] = None
# URLs
web_url: Optional[str] = None
# Original-Daten
raw_data: Dict = field(default_factory=dict)
@property
def effective_date(self) -> Optional[datetime]:
"""Das effektive Datum (Rechnungsdatum oder Archivdatum)."""
return self.rechnungsdatum or self.archive_date
@property
def year(self) -> Optional[int]:
"""Jahr des effektiven Datums."""
date = self.effective_date
return date.year if date else None
@property
def month(self) -> Optional[int]:
"""Monat des effektiven Datums."""
date = self.effective_date
return date.month if date else None
@property
def month_year(self) -> Optional[str]:
"""Monat/Jahr als String (z.B. '2024-01')."""
date = self.effective_date
return date.strftime('%Y-%m') if date else None
@property
def quarter(self) -> Optional[str]:
"""Quartal als String (z.B. 'Q1 2024')."""
date = self.effective_date
if not date:
return None
q = (date.month - 1) // 3 + 1
return f"Q{q} {date.year}"
class DocumentExtractor:
"""Extrahiert und verarbeitet Dokumente aus Paperless."""
def __init__(self, client: PaperlessClient, config: Optional[Config] = None):
"""
Initialisiert den Extractor.
Args:
client: Paperless API Client
config: Konfiguration
"""
self.client = client
self.config = config or get_config()
self._custom_fields_map: Dict[str, int] = {}
def _build_custom_fields_map(self) -> None:
"""Baut ein Mapping von Feldnamen zu IDs."""
if self._custom_fields_map:
return
fields = self.client.get_custom_fields()
for field_id, field_def in fields.items():
name = field_def['name'].lower()
self._custom_fields_map[name] = field_id
def _parse_decimal(self, value: Any) -> Optional[Decimal]:
"""
Parst einen Wert zu Decimal.
Verarbeitet verschiedene Formate:
- 1234.56
- 1234,56
- 1'234.56 (Schweizer Format)
- CHF 1234.56
"""
if value is None:
return None
if isinstance(value, (int, float)):
return Decimal(str(value))
if isinstance(value, Decimal):
return value
if not isinstance(value, str):
return None
# String bereinigen
value = value.strip()
# Währungssymbole entfernen
value = re.sub(r'^(CHF|EUR|USD|Fr\.?)\s*', '', value, flags=re.IGNORECASE)
value = re.sub(r'\s*(CHF|EUR|USD|Fr\.?)$', '', value, flags=re.IGNORECASE)
# Tausender-Trennzeichen entfernen (Apostroph, Punkt als Tausender)
# Schweizer Format: 1'234.56 oder 1'234,56
if "'" in value:
value = value.replace("'", "")
# Deutsches/Schweizer Format mit Punkt als Tausender: 1.234,56
if re.match(r'^\d{1,3}(\.\d{3})+,\d{2}$', value):
value = value.replace(".", "").replace(",", ".")
# Komma als Dezimaltrennzeichen ohne Tausender
elif "," in value and "." not in value:
value = value.replace(",", ".")
try:
return Decimal(value)
except InvalidOperation:
logger.warning(f"Konnte Betrag nicht parsen: {value}")
return None
def _parse_date(self, value: Any) -> Optional[datetime]:
"""Parst einen Wert zu datetime."""
if value is None:
return None
if isinstance(value, datetime):
return value
if not isinstance(value, str):
return None
try:
return parse_date(value)
except (ValueError, TypeError):
logger.warning(f"Konnte Datum nicht parsen: {value}")
return None
def _get_custom_field_value(self, doc: dict, field_name: str) -> Any:
"""Holt den Wert eines Custom Fields aus einem Dokument."""
# Aus resolved fields
resolved = doc.get('custom_fields_resolved', {})
if field_name in resolved:
return resolved[field_name].get('value')
# Aus rohen custom_fields
self._build_custom_fields_map()
field_name_lower = field_name.lower()
for cf in doc.get('custom_fields', []):
field_id = cf.get('field')
# Prüfen ob ID zum gesuchten Feldnamen passt
for name, fid in self._custom_fields_map.items():
if fid == field_id and name == field_name_lower:
return cf.get('value')
return None
def extract_document(self, raw_doc: dict) -> FinanceDocument:
"""
Extrahiert ein aufbereitetes FinanceDocument aus den Rohdaten.
Args:
raw_doc: Rohes Dokument-Dictionary von der API
Returns:
FinanceDocument-Instanz
"""
# Custom Field Namen aus Config
cf_names = self.config.custom_field_names
# Basis-Daten
doc = FinanceDocument(
id=raw_doc['id'],
title=raw_doc.get('title', ''),
raw_data=raw_doc
)
# Datums-Felder
doc.archive_date = self._parse_date(raw_doc.get('archive_date'))
doc.created = self._parse_date(raw_doc.get('created'))
doc.added = self._parse_date(raw_doc.get('added'))
# Korrespondent
doc.correspondent_id = raw_doc.get('correspondent')
doc.correspondent = raw_doc.get('correspondent_name', '')
# Dokumenttyp
doc.document_type = raw_doc.get('document_type_name', '')
# Tags
doc.tag_ids = raw_doc.get('tags', [])
doc.tags = raw_doc.get('tag_names', [])
# URL
doc.web_url = raw_doc.get('web_url', '')
# Custom Fields
betrag_name = cf_names.get('betrag', 'betrag')
doc.betrag = self._parse_decimal(
self._get_custom_field_value(raw_doc, betrag_name)
)
datum_name = cf_names.get('rechnungsdatum', 'rechnungsdatum')
doc.rechnungsdatum = self._parse_date(
self._get_custom_field_value(raw_doc, datum_name)
)
kat_name = cf_names.get('kategorie', 'kategorie')
doc.kategorie = self._get_custom_field_value(raw_doc, kat_name)
zahl_name = cf_names.get('zahlungsart', 'zahlungsart')
doc.zahlungsart = self._get_custom_field_value(raw_doc, zahl_name)
periode_name = cf_names.get('periode', 'periode')
doc.periode = self._get_custom_field_value(raw_doc, periode_name)
notiz_name = cf_names.get('notiz', 'notiz')
doc.notiz = self._get_custom_field_value(raw_doc, notiz_name)
return doc
def extract_documents(self, raw_docs: List[dict]) -> List[FinanceDocument]:
"""
Extrahiert mehrere Dokumente.
Args:
raw_docs: Liste von Roh-Dokumenten
Returns:
Liste von FinanceDocument-Instanzen
"""
# Metadaten auflösen
resolved = self.client.resolve_all_metadata(raw_docs)
return [self.extract_document(doc) for doc in resolved]
@dataclass
class AggregationResult:
"""Ergebnis einer Aggregation."""
# Basis-Statistiken
total_amount: Decimal = Decimal('0')
document_count: int = 0
documents_with_amount: int = 0
documents_without_amount: int = 0
# Dokumente
documents: List[FinanceDocument] = field(default_factory=list)
# Gruppierte Daten
by_tag: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_correspondent: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_category: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_payment_type: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_month: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_quarter: Dict[str, 'GroupStats'] = field(default_factory=dict)
by_year: Dict[int, 'GroupStats'] = field(default_factory=dict)
# Zusätzliche Statistiken
average_amount: Decimal = Decimal('0')
median_amount: Decimal = Decimal('0')
min_amount: Decimal = Decimal('0')
max_amount: Decimal = Decimal('0')
top_items: List[FinanceDocument] = field(default_factory=list)
@property
def total_formatted(self) -> str:
"""Formatierte Gesamtsumme."""
return f"{self.total_amount:,.2f}".replace(',', "'")
@dataclass
class GroupStats:
"""Statistiken für eine Gruppe."""
name: str
amount: Decimal = Decimal('0')
count: int = 0
percentage: float = 0.0
documents: List[FinanceDocument] = field(default_factory=list)
@property
def amount_formatted(self) -> str:
"""Formatierter Betrag."""
return f"{self.amount:,.2f}".replace(',', "'")
class DataAggregator:
"""Aggregiert Finanzdokumente nach verschiedenen Kriterien."""
def __init__(self, config: Optional[Config] = None):
"""
Initialisiert den Aggregator.
Args:
config: Konfiguration
"""
self.config = config or get_config()
def aggregate(
self,
documents: List[FinanceDocument],
group_by: Optional[List[str]] = None
) -> AggregationResult:
"""
Aggregiert Dokumente.
Args:
documents: Liste von Dokumenten
group_by: Liste von Gruppierungskriterien:
'tag', 'correspondent', 'category', 'payment_type',
'month', 'quarter', 'year'
Returns:
AggregationResult mit allen Statistiken
"""
result = AggregationResult()
result.documents = documents
result.document_count = len(documents)
# Beträge sammeln
amounts: List[Decimal] = []
for doc in documents:
if doc.betrag is not None:
result.total_amount += doc.betrag
result.documents_with_amount += 1
amounts.append(doc.betrag)
else:
result.documents_without_amount += 1
# Basis-Statistiken
if amounts:
amounts_sorted = sorted(amounts)
result.min_amount = amounts_sorted[0]
result.max_amount = amounts_sorted[-1]
result.average_amount = result.total_amount / len(amounts)
# Median
mid = len(amounts_sorted) // 2
if len(amounts_sorted) % 2 == 0:
result.median_amount = (amounts_sorted[mid - 1] + amounts_sorted[mid]) / 2
else:
result.median_amount = amounts_sorted[mid]
# Top-Posten
docs_with_amount = [d for d in documents if d.betrag is not None]
result.top_items = sorted(
docs_with_amount,
key=lambda d: d.betrag or Decimal('0'),
reverse=True
)[:10]
# Gruppierungen
group_by = group_by or ['tag', 'correspondent', 'category', 'month']
if 'tag' in group_by:
result.by_tag = self._group_by_tags(documents, result.total_amount)
if 'correspondent' in group_by:
result.by_correspondent = self._group_by_field(
documents, 'correspondent', result.total_amount
)
if 'category' in group_by:
result.by_category = self._group_by_field(
documents, 'kategorie', result.total_amount
)
if 'payment_type' in group_by:
result.by_payment_type = self._group_by_field(
documents, 'zahlungsart', result.total_amount
)
if 'month' in group_by:
result.by_month = self._group_by_field(
documents, 'month_year', result.total_amount
)
if 'quarter' in group_by:
result.by_quarter = self._group_by_field(
documents, 'quarter', result.total_amount
)
if 'year' in group_by:
result.by_year = self._group_by_field(
documents, 'year', result.total_amount
)
return result
def _group_by_tags(
self,
documents: List[FinanceDocument],
total: Decimal
) -> Dict[str, GroupStats]:
"""Gruppiert nach Tags (ein Dokument kann mehrere Tags haben)."""
groups: Dict[str, GroupStats] = {}
for doc in documents:
if not doc.tags:
tag_name = 'Ohne Tag'
if tag_name not in groups:
groups[tag_name] = GroupStats(name=tag_name)
groups[tag_name].count += 1
if doc.betrag:
groups[tag_name].amount += doc.betrag
groups[tag_name].documents.append(doc)
else:
for tag in doc.tags:
if tag not in groups:
groups[tag] = GroupStats(name=tag)
groups[tag].count += 1
if doc.betrag:
groups[tag].amount += doc.betrag
groups[tag].documents.append(doc)
# Prozente berechnen
if total > 0:
for stats in groups.values():
stats.percentage = float(stats.amount / total * 100)
# Nach Betrag sortieren
return dict(sorted(
groups.items(),
key=lambda x: x[1].amount,
reverse=True
))
def _group_by_field(
self,
documents: List[FinanceDocument],
field: str,
total: Decimal
) -> Dict[str, GroupStats]:
"""Gruppiert nach einem einzelnen Feld."""
groups: Dict[str, GroupStats] = {}
for doc in documents:
value = getattr(doc, field, None)
if value is None or value == '':
key = 'Nicht zugeordnet'
else:
key = str(value)
if key not in groups:
groups[key] = GroupStats(name=key)
groups[key].count += 1
if doc.betrag:
groups[key].amount += doc.betrag
groups[key].documents.append(doc)
# Prozente berechnen
if total > 0:
for stats in groups.values():
stats.percentage = float(stats.amount / total * 100)
# Nach Betrag sortieren (bei Monaten chronologisch)
if field in ('month_year', 'quarter'):
return dict(sorted(groups.items()))
else:
return dict(sorted(
groups.items(),
key=lambda x: x[1].amount,
reverse=True
))
def compare_periods(
self,
documents: List[FinanceDocument],
period1: Union[int, str],
period2: Union[int, str],
period_type: str = 'year'
) -> Dict[str, Any]:
"""
Vergleicht zwei Zeiträume.
Args:
documents: Alle Dokumente
period1: Erste Periode (z.B. 2023)
period2: Zweite Periode (z.B. 2024)
period_type: 'year', 'quarter', 'month'
Returns:
Vergleichsergebnis
"""
# Dokumente nach Periode filtern
def get_period(doc: FinanceDocument) -> Optional[Union[int, str]]:
if period_type == 'year':
return doc.year
elif period_type == 'quarter':
return doc.quarter
elif period_type == 'month':
return doc.month_year
return None
docs1 = [d for d in documents if get_period(d) == period1]
docs2 = [d for d in documents if get_period(d) == period2]
agg1 = self.aggregate(docs1, ['tag', 'category'])
agg2 = self.aggregate(docs2, ['tag', 'category'])
# Differenzen berechnen
diff_absolute = agg2.total_amount - agg1.total_amount
diff_percent = (
float(diff_absolute / agg1.total_amount * 100)
if agg1.total_amount > 0 else 0
)
# Kategorien vergleichen
category_comparison = {}
all_categories = set(agg1.by_category.keys()) | set(agg2.by_category.keys())
for cat in all_categories:
stats1 = agg1.by_category.get(cat, GroupStats(name=cat))
stats2 = agg2.by_category.get(cat, GroupStats(name=cat))
diff = stats2.amount - stats1.amount
pct_change = (
float(diff / stats1.amount * 100)
if stats1.amount > 0 else (100.0 if stats2.amount > 0 else 0)
)
category_comparison[cat] = {
'period1': stats1.amount,
'period2': stats2.amount,
'diff_absolute': diff,
'diff_percent': pct_change,
'status': 'new' if stats1.amount == 0 else (
'removed' if stats2.amount == 0 else 'changed'
)
}
return {
'period1': {
'name': str(period1),
'total': agg1.total_amount,
'count': agg1.document_count,
'aggregation': agg1,
},
'period2': {
'name': str(period2),
'total': agg2.total_amount,
'count': agg2.document_count,
'aggregation': agg2,
},
'diff_absolute': diff_absolute,
'diff_percent': diff_percent,
'category_comparison': category_comparison,
}