diff --git a/dctp/.gitignore b/dctp/.gitignore new file mode 100644 index 0000000..37fbe4a --- /dev/null +++ b/dctp/.gitignore @@ -0,0 +1,5 @@ +__pycache__/ +*.pyc +*.pyo +.dctp_backups/ +.dctp_settings.json diff --git a/dctp/CLAUDE.md b/dctp/CLAUDE.md new file mode 100644 index 0000000..848b571 --- /dev/null +++ b/dctp/CLAUDE.md @@ -0,0 +1,102 @@ +# DCTP - Delta Code Transfer Protocol + +Du generierst Code im DCTP-Format fuer effiziente Uebertragung. + +## Regeln + +1. **Zeilennummern am Ende jeder Zeile** im passenden Kommentar-Format +2. **Immer mit ###FILE: beginnen** bei jedem Codeblock +3. **Bei Korrekturen NUR die geaenderten Zeilen senden**, nie den ganzen File + +## Zeilennummern-Format + +- Python/Shell: `code #Z1` +- JavaScript/Java/C/C++: `code //Z1` +- HTML: `code ` +- CSS: `code /*Z1*/` +- SQL: `code --Z1` + +## Befehle + +| Befehl | Syntax | Beschreibung | +|--------|--------|--------------| +| `###FILE:` | `###FILE:pfad/datei.ext` | Datei angeben | +| `###NEW` | | Neue Datei, kompletter Inhalt folgt | +| `###DELETE:` | `###DELETE:Z5-Z12` | Zeilen 5-12 loeschen | +| `###INSERT_AFTER:` | `###INSERT_AFTER:Z5` | Nach Zeile 5 einfuegen | +| `###REPLACE:` | `###REPLACE:Z5-Z8` | Zeilen 5-8 ersetzen | +| `###END` | | Ende des Blocks | +| `###RENUMBER` | | Zeilennummern neu berechnen | +| `###CHECKSUM:` | `###CHECKSUM:a3f2b8c1` | Optional: Hash zur Validierung | + +## Beispiel: Neue Datei + +``` +###FILE:src/calculator.py +###NEW +def add(a, b): #Z1 + return a + b #Z2 + #Z3 +def multiply(a, b): #Z4 + return a * b #Z5 +###END +``` + +## Beispiel: Korrektur (REPLACE) + +``` +###FILE:src/calculator.py +###REPLACE:Z4-Z5 +def multiply(a, b): #Z4 + """Multipliziert zwei Zahlen.""" #Z5 + return a * b #Z6 +###END +###RENUMBER +``` + +## Beispiel: Zeilen einfuegen + +``` +###FILE:src/calculator.py +###INSERT_AFTER:Z2 + #Z3 +def subtract(a, b): #Z4 + return a - b #Z5 +###END +###RENUMBER +``` + +## Beispiel: Zeilen loeschen + +``` +###FILE:src/calculator.py +###DELETE:Z10-Z15 +###RENUMBER +``` + +## Beispiel: Mehrere Dateien + +``` +###FILE:src/models/user.py +###NEW +class User: #Z1 + def __init__(self, name: str): #Z2 + self.name = name #Z3 +###END + +###FILE:src/models/order.py +###NEW +from .user import User #Z1 + #Z2 +class Order: #Z3 + def __init__(self, user: User): #Z4 + self.user = user #Z5 +###END +``` + +## Wichtig + +- Bei Korrekturen: NUR Delta senden, nie kompletten File +- Nach INSERT/DELETE/REPLACE immer ###RENUMBER +- Leerzeilen auch nummerieren +- Zeilennummern werden beim Schreiben automatisch entfernt diff --git a/dctp/README.md b/dctp/README.md new file mode 100644 index 0000000..cf83270 --- /dev/null +++ b/dctp/README.md @@ -0,0 +1,147 @@ +# DCTP - Delta Code Transfer Protocol + +Ein Tool um KI-generierten Code effizient in lokale Dateien zu uebertragen. Statt bei jeder Korrektur den kompletten Code neu zu senden, werden nur Aenderungen (Deltas) uebertragen. + +## Das Problem + +Claude generiert 500 Zeilen Code. Eine kleine Korrektur = nochmal 500 Zeilen. Verschwendung. + +## Die Loesung + +Zeilennummerierter Code + Steueranweisungen fuer gezielte Aenderungen. + +## Installation + +```bash +# Requirements installieren +pip install -r requirements.txt + +# GUI starten +python dctp_gui.py +``` + +## Schnellstart + +### 1. Projektverzeichnis waehlen + +Klicke auf "Waehlen" und waehle dein Projektverzeichnis. + +### 2. KI-Output einfuegen + +Kopiere den DCTP-formatierten Output aus deinem Claude-Chat in das Input-Feld. + +### 3. Analysieren + +Klicke "Analysieren" um eine Vorschau der Operationen zu sehen. + +### 4. Ausfuehren + +Klicke "Ausfuehren" um die Aenderungen auf deine Dateien anzuwenden. + +## DCTP-Format + +### Neue Datei erstellen + +``` +###FILE:src/calculator.py +###NEW +def add(a, b): #Z1 + return a + b #Z2 +###END +``` + +### Zeilen ersetzen + +``` +###FILE:src/calculator.py +###REPLACE:Z1-Z2 +def add(a: int, b: int) -> int: #Z1 + """Addiert zwei Zahlen.""" #Z2 + return a + b #Z3 +###END +###RENUMBER +``` + +### Zeilen einfuegen + +``` +###FILE:src/calculator.py +###INSERT_AFTER:Z2 + #Z3 +def subtract(a, b): #Z4 + return a - b #Z5 +###END +###RENUMBER +``` + +### Zeilen loeschen + +``` +###FILE:src/calculator.py +###DELETE:Z10-Z15 +###RENUMBER +``` + +## Zeilennummern-Format + +Die Zeilennummern werden automatisch entsprechend der Programmiersprache formatiert: + +| Sprache | Format | Beispiel | +|---------|--------|----------| +| Python | `#Z1` | `code #Z1` | +| JavaScript | `//Z1` | `code //Z1` | +| HTML | `` | `code ` | +| CSS | `/*Z1*/` | `code /*Z1*/` | +| SQL | `--Z1` | `code --Z1` | + +## Befehle + +| Befehl | Beschreibung | +|--------|--------------| +| `###FILE:pfad` | Zieldatei angeben | +| `###NEW` | Neue Datei erstellen | +| `###DELETE:Z5-Z12` | Zeilen loeschen | +| `###INSERT_AFTER:Z5` | Nach Zeile einfuegen | +| `###REPLACE:Z5-Z8` | Zeilen ersetzen | +| `###END` | Block beenden | +| `###RENUMBER` | Zeilennummern aktualisieren | +| `###CHECKSUM:hash` | Datei-Hash validieren | + +## Features + +- **Vorschau**: Zeigt was passieren wird, bevor es ausgefuehrt wird +- **Diff-Ansicht**: Zeigt Aenderungen farbig markiert (alt vs neu) +- **Undo**: Stellt den letzten Zustand wieder her +- **Backup**: Automatische Backups vor jeder Aenderung +- **Multi-File**: Mehrere Dateien in einem Durchgang bearbeiten +- **Checksum**: Optionale Validierung gegen externe Aenderungen + +## Einstellungen + +- **Projektpfad**: Standard-Projektverzeichnis +- **Backup-Verzeichnis**: Wo Backups gespeichert werden +- **Auto-Renumber**: Zeilennummern automatisch aktualisieren +- **Checksum-Validierung**: Externe Aenderungen erkennen +- **Theme**: Hell oder dunkel + +## Claude-Integration + +Kopiere den Inhalt von `CLAUDE.md` in deine Claude-Chats (als Custom Instructions oder am Anfang des Gespraechs), damit Claude im DCTP-Format antwortet. + +## Architektur + +``` +dctp/ +├── dctp_gui.py # Hauptfenster (CustomTkinter) +├── dctp_parser.py # Core-Logik: parse Steueranweisungen +├── dctp_executor.py # Fuehrt Operationen aus +├── dctp_backup.py # Undo/Backup-Verwaltung +├── dctp_diff.py # Diff-Berechnung fuer Vorschau +├── requirements.txt # Dependencies +├── CLAUDE.md # Anweisung fuer KI +└── README.md # Diese Datei +``` + +## Lizenz + +MIT License diff --git a/dctp/dctp_backup.py b/dctp/dctp_backup.py new file mode 100644 index 0000000..d0ae631 --- /dev/null +++ b/dctp/dctp_backup.py @@ -0,0 +1,305 @@ +""" +DCTP Backup Manager - Handles backup and undo functionality. + +Creates timestamped backups before operations and supports +restoring files to their previous state. +""" + +import json +import os +import shutil +from dataclasses import dataclass, asdict +from datetime import datetime +from pathlib import Path +from typing import Optional + + +@dataclass +class FileBackup: + """Represents a single file backup.""" + original: str + backup: str + existed: bool # Whether the file existed before (for new file handling) + + +@dataclass +class BackupSession: + """Represents a backup session (one execution run).""" + timestamp: str + files: list[FileBackup] + + +@dataclass +class BackupInfo: + """Info about a backup for display purposes.""" + timestamp: str + file_count: int + files: list[str] + + +class BackupManager: + """Manages file backups for undo functionality.""" + + BACKUP_DIR_NAME = ".dctp_backups" + MANIFEST_FILE = "manifest.json" + MAX_SESSIONS = 50 # Keep last 50 sessions + + def __init__(self, project_path: str): + """ + Initialize backup manager. + + Args: + project_path: Base project directory + """ + self.project_path = Path(project_path) + self.backup_dir = self.project_path / self.BACKUP_DIR_NAME + self.manifest_path = self.backup_dir / self.MANIFEST_FILE + self._current_session: Optional[BackupSession] = None + self._ensure_backup_dir() + + def _ensure_backup_dir(self) -> None: + """Create backup directory if it doesn't exist.""" + self.backup_dir.mkdir(parents=True, exist_ok=True) + + # Create .gitignore in backup dir + gitignore_path = self.backup_dir / ".gitignore" + if not gitignore_path.exists(): + gitignore_path.write_text("*\n") + + def _load_manifest(self) -> dict: + """Load the manifest file.""" + if self.manifest_path.exists(): + try: + return json.loads(self.manifest_path.read_text()) + except (json.JSONDecodeError, IOError): + return {"sessions": []} + return {"sessions": []} + + def _save_manifest(self, manifest: dict) -> None: + """Save the manifest file.""" + self.manifest_path.write_text(json.dumps(manifest, indent=2)) + + def start_session(self) -> None: + """Start a new backup session.""" + timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + self._current_session = BackupSession(timestamp=timestamp, files=[]) + + def backup(self, file_path: str) -> Optional[str]: + """ + Create a backup of a file. + + Args: + file_path: Path to the file (relative to project or absolute) + + Returns: + Backup filename if successful, None if file doesn't exist + """ + if self._current_session is None: + self.start_session() + + # Normalize path + if os.path.isabs(file_path): + full_path = Path(file_path) + rel_path = full_path.relative_to(self.project_path) + else: + rel_path = Path(file_path) + full_path = self.project_path / rel_path + + # Check if file exists + existed = full_path.exists() + + if existed: + # Generate backup filename + timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S") + safe_name = str(rel_path).replace(os.sep, "_").replace("/", "_") + backup_name = f"{timestamp}_{safe_name}" + + # Copy file to backup + backup_path = self.backup_dir / backup_name + shutil.copy2(full_path, backup_path) + else: + backup_name = "" + + # Add to current session + self._current_session.files.append(FileBackup( + original=str(rel_path), + backup=backup_name, + existed=existed + )) + + return backup_name if existed else None + + def end_session(self) -> None: + """End the current backup session and save to manifest.""" + if self._current_session is None or len(self._current_session.files) == 0: + self._current_session = None + return + + manifest = self._load_manifest() + + # Convert to dict for JSON storage + session_dict = { + "timestamp": self._current_session.timestamp, + "files": [asdict(f) for f in self._current_session.files] + } + manifest["sessions"].append(session_dict) + + # Limit number of sessions + if len(manifest["sessions"]) > self.MAX_SESSIONS: + # Remove old sessions and their backup files + old_sessions = manifest["sessions"][:-self.MAX_SESSIONS] + for session in old_sessions: + for file_info in session["files"]: + backup_file = self.backup_dir / file_info["backup"] + if backup_file.exists(): + backup_file.unlink() + manifest["sessions"] = manifest["sessions"][-self.MAX_SESSIONS:] + + self._save_manifest(manifest) + self._current_session = None + + def restore_last(self) -> tuple[bool, list[str]]: + """ + Restore files from the last backup session. + + Returns: + Tuple of (success, list of restored files) + """ + manifest = self._load_manifest() + + if not manifest["sessions"]: + return False, [] + + # Get last session + last_session = manifest["sessions"].pop() + restored_files = [] + + for file_info in last_session["files"]: + original_path = self.project_path / file_info["original"] + + if file_info["existed"]: + # Restore from backup + backup_path = self.backup_dir / file_info["backup"] + if backup_path.exists(): + # Ensure parent directory exists + original_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(backup_path, original_path) + backup_path.unlink() # Remove backup file + restored_files.append(file_info["original"]) + else: + # File was newly created, delete it + if original_path.exists(): + original_path.unlink() + restored_files.append(f"{file_info['original']} (deleted)") + + self._save_manifest(manifest) + return True, restored_files + + def list_backups(self) -> list[BackupInfo]: + """ + List all backup sessions. + + Returns: + List of BackupInfo objects, newest first + """ + manifest = self._load_manifest() + backups = [] + + for session in reversed(manifest["sessions"]): + files = [f["original"] for f in session["files"]] + backups.append(BackupInfo( + timestamp=session["timestamp"], + file_count=len(files), + files=files + )) + + return backups + + def get_file_backup_path(self, file_path: str) -> Optional[Path]: + """ + Get the backup path for a file from the most recent session. + + Args: + file_path: Original file path (relative to project) + + Returns: + Path to backup file if found, None otherwise + """ + manifest = self._load_manifest() + + if not manifest["sessions"]: + return None + + # Search from newest to oldest + for session in reversed(manifest["sessions"]): + for file_info in session["files"]: + if file_info["original"] == file_path and file_info["existed"]: + backup_path = self.backup_dir / file_info["backup"] + if backup_path.exists(): + return backup_path + + return None + + def clear_all_backups(self) -> int: + """ + Clear all backups. + + Returns: + Number of backup files deleted + """ + count = 0 + if self.backup_dir.exists(): + for item in self.backup_dir.iterdir(): + if item.name != ".gitignore": + if item.is_file(): + item.unlink() + count += 1 + elif item.is_dir(): + shutil.rmtree(item) + count += 1 + + # Reset manifest + self._save_manifest({"sessions": []}) + return count + + +def main(): + """Test the backup manager.""" + import tempfile + + # Create a temporary project directory + with tempfile.TemporaryDirectory() as tmpdir: + # Create some test files + test_file = Path(tmpdir) / "test.py" + test_file.write_text("print('hello')\n") + + # Initialize backup manager + manager = BackupManager(tmpdir) + + # Start a session and backup the file + manager.start_session() + backup_name = manager.backup("test.py") + print(f"Created backup: {backup_name}") + + # Modify the file + test_file.write_text("print('modified')\n") + print(f"File content after modification: {test_file.read_text()}") + + # End session + manager.end_session() + + # List backups + backups = manager.list_backups() + print(f"Backup sessions: {len(backups)}") + for b in backups: + print(f" {b.timestamp}: {b.file_count} files") + + # Restore + success, restored = manager.restore_last() + print(f"Restore successful: {success}") + print(f"Restored files: {restored}") + print(f"File content after restore: {test_file.read_text()}") + + +if __name__ == "__main__": + main() diff --git a/dctp/dctp_diff.py b/dctp/dctp_diff.py new file mode 100644 index 0000000..9116928 --- /dev/null +++ b/dctp/dctp_diff.py @@ -0,0 +1,385 @@ +""" +DCTP Diff Generator - Generates diffs for preview display. + +Compares old and new content and produces colored diff output +for the GUI preview. +""" + +import difflib +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class DiffType(Enum): + UNCHANGED = "unchanged" + ADDED = "added" + REMOVED = "removed" + CONTEXT = "context" + + +@dataclass +class DiffLine: + """Represents a single line in a diff.""" + type: DiffType + line_number_old: Optional[int] # Line number in old file + line_number_new: Optional[int] # Line number in new file + content: str + + @property + def prefix(self) -> str: + """Get the diff prefix character.""" + if self.type == DiffType.ADDED: + return "+" + elif self.type == DiffType.REMOVED: + return "-" + else: + return " " + + def __str__(self) -> str: + old_num = str(self.line_number_old) if self.line_number_old else "" + new_num = str(self.line_number_new) if self.line_number_new else "" + return f"{old_num:>4} {new_num:>4} {self.prefix} {self.content}" + + +@dataclass +class DiffBlock: + """A block of related diff lines.""" + start_old: int + end_old: int + start_new: int + end_new: int + lines: list[DiffLine] + + @property + def header(self) -> str: + """Generate a unified diff style header.""" + return f"@@ -{self.start_old},{self.end_old - self.start_old + 1} +{self.start_new},{self.end_new - self.start_new + 1} @@" + + +@dataclass +class FileDiff: + """Complete diff for a file.""" + filename: str + old_content: list[str] + new_content: list[str] + blocks: list[DiffBlock] + lines: list[DiffLine] + + @property + def has_changes(self) -> bool: + return any(line.type in (DiffType.ADDED, DiffType.REMOVED) for line in self.lines) + + @property + def additions(self) -> int: + return sum(1 for line in self.lines if line.type == DiffType.ADDED) + + @property + def deletions(self) -> int: + return sum(1 for line in self.lines if line.type == DiffType.REMOVED) + + +class DiffGenerator: + """Generates diffs between old and new content.""" + + def __init__(self, context_lines: int = 3): + """ + Initialize diff generator. + + Args: + context_lines: Number of context lines around changes + """ + self.context_lines = context_lines + + def generate( + self, + old_lines: list[str], + new_lines: list[str], + filename: str = "" + ) -> FileDiff: + """ + Generate a diff between old and new content. + + Args: + old_lines: Original content lines + new_lines: New content lines + filename: Optional filename for display + + Returns: + FileDiff object with all diff information + """ + diff_lines: list[DiffLine] = [] + + # Use difflib to compute differences + matcher = difflib.SequenceMatcher(None, old_lines, new_lines) + + old_line_num = 1 + new_line_num = 1 + + for tag, i1, i2, j1, j2 in matcher.get_opcodes(): + if tag == 'equal': + for idx in range(i2 - i1): + diff_lines.append(DiffLine( + type=DiffType.UNCHANGED, + line_number_old=old_line_num, + line_number_new=new_line_num, + content=old_lines[i1 + idx] + )) + old_line_num += 1 + new_line_num += 1 + + elif tag == 'replace': + # Show removed lines first, then added + for idx in range(i2 - i1): + diff_lines.append(DiffLine( + type=DiffType.REMOVED, + line_number_old=old_line_num, + line_number_new=None, + content=old_lines[i1 + idx] + )) + old_line_num += 1 + + for idx in range(j2 - j1): + diff_lines.append(DiffLine( + type=DiffType.ADDED, + line_number_old=None, + line_number_new=new_line_num, + content=new_lines[j1 + idx] + )) + new_line_num += 1 + + elif tag == 'delete': + for idx in range(i2 - i1): + diff_lines.append(DiffLine( + type=DiffType.REMOVED, + line_number_old=old_line_num, + line_number_new=None, + content=old_lines[i1 + idx] + )) + old_line_num += 1 + + elif tag == 'insert': + for idx in range(j2 - j1): + diff_lines.append(DiffLine( + type=DiffType.ADDED, + line_number_old=None, + line_number_new=new_line_num, + content=new_lines[j1 + idx] + )) + new_line_num += 1 + + # Generate blocks with context + blocks = self._generate_blocks(diff_lines) + + return FileDiff( + filename=filename, + old_content=old_lines, + new_content=new_lines, + blocks=blocks, + lines=diff_lines + ) + + def _generate_blocks(self, diff_lines: list[DiffLine]) -> list[DiffBlock]: + """Generate diff blocks with context.""" + if not diff_lines: + return [] + + blocks: list[DiffBlock] = [] + current_block_lines: list[DiffLine] = [] + in_change = False + unchanged_count = 0 + + for line in diff_lines: + is_change = line.type in (DiffType.ADDED, DiffType.REMOVED) + + if is_change: + if not in_change: + # Starting a new change block, include context + in_change = True + unchanged_count = 0 + current_block_lines.append(line) + + else: # Unchanged line + if in_change: + unchanged_count += 1 + if unchanged_count <= self.context_lines: + current_block_lines.append(line) + else: + # End current block and start fresh + if current_block_lines: + blocks.append(self._create_block(current_block_lines)) + current_block_lines = [] + in_change = False + unchanged_count = 0 + else: + # Keep track of potential context lines + current_block_lines.append(line) + if len(current_block_lines) > self.context_lines: + current_block_lines.pop(0) + + # Don't forget the last block + if current_block_lines and any( + l.type in (DiffType.ADDED, DiffType.REMOVED) for l in current_block_lines + ): + blocks.append(self._create_block(current_block_lines)) + + return blocks + + def _create_block(self, lines: list[DiffLine]) -> DiffBlock: + """Create a DiffBlock from a list of lines.""" + old_nums = [l.line_number_old for l in lines if l.line_number_old is not None] + new_nums = [l.line_number_new for l in lines if l.line_number_new is not None] + + return DiffBlock( + start_old=min(old_nums) if old_nums else 0, + end_old=max(old_nums) if old_nums else 0, + start_new=min(new_nums) if new_nums else 0, + end_new=max(new_nums) if new_nums else 0, + lines=lines + ) + + def generate_unified_diff( + self, + old_lines: list[str], + new_lines: list[str], + old_filename: str = "a/file", + new_filename: str = "b/file" + ) -> str: + """ + Generate a unified diff string. + + Args: + old_lines: Original content lines + new_lines: New content lines + old_filename: Label for old file + new_filename: Label for new file + + Returns: + Unified diff as string + """ + diff = difflib.unified_diff( + old_lines, + new_lines, + fromfile=old_filename, + tofile=new_filename, + lineterm="" + ) + return "\n".join(diff) + + def generate_side_by_side( + self, + old_lines: list[str], + new_lines: list[str], + width: int = 80 + ) -> list[tuple[str, str, str]]: + """ + Generate a side-by-side diff representation. + + Args: + old_lines: Original content lines + new_lines: New content lines + width: Width for each column + + Returns: + List of tuples (left_line, marker, right_line) + """ + result = [] + half_width = (width - 3) // 2 + + matcher = difflib.SequenceMatcher(None, old_lines, new_lines) + + for tag, i1, i2, j1, j2 in matcher.get_opcodes(): + if tag == 'equal': + for idx in range(i2 - i1): + line = old_lines[i1 + idx][:half_width] + result.append((line, " ", line)) + + elif tag == 'replace': + max_len = max(i2 - i1, j2 - j1) + for idx in range(max_len): + old_line = old_lines[i1 + idx][:half_width] if idx < i2 - i1 else "" + new_line = new_lines[j1 + idx][:half_width] if idx < j2 - j1 else "" + result.append((old_line, "|", new_line)) + + elif tag == 'delete': + for idx in range(i2 - i1): + old_line = old_lines[i1 + idx][:half_width] + result.append((old_line, "<", "")) + + elif tag == 'insert': + for idx in range(j2 - j1): + new_line = new_lines[j1 + idx][:half_width] + result.append(("", ">", new_line)) + + return result + + +def format_diff_for_display(diff: FileDiff, use_colors: bool = True) -> str: + """ + Format a FileDiff for terminal/GUI display. + + Args: + diff: The FileDiff to format + use_colors: Whether to use ANSI colors + + Returns: + Formatted string + """ + lines = [] + + if diff.filename: + lines.append(f"--- {diff.filename}") + lines.append(f"+++ {diff.filename}") + + for line in diff.lines: + if use_colors: + if line.type == DiffType.ADDED: + prefix = "\033[32m+" # Green + suffix = "\033[0m" + elif line.type == DiffType.REMOVED: + prefix = "\033[31m-" # Red + suffix = "\033[0m" + else: + prefix = " " + suffix = "" + else: + prefix = line.prefix + suffix = "" + + lines.append(f"{prefix} {line.content}{suffix}") + + return "\n".join(lines) + + +def main(): + """Test the diff generator.""" + old_content = [ + "def calculate_tax(amount):", + " rate = 0.19", + " if amount > 1000:", + " rate = 0.25", + " return amount * rate", + ] + + new_content = [ + "def calculate_tax(amount):", + " rate = 0.19", + " if amount > 10000:", + " rate = 0.22", + " elif amount > 1000:", + " rate = 0.19", + " return amount * rate", + ] + + generator = DiffGenerator() + diff = generator.generate(old_content, new_content, "calculator.py") + + print(f"File: {diff.filename}") + print(f"Additions: {diff.additions}, Deletions: {diff.deletions}") + print() + print("Diff output:") + print(format_diff_for_display(diff)) + + +if __name__ == "__main__": + main() diff --git a/dctp/dctp_executor.py b/dctp/dctp_executor.py new file mode 100644 index 0000000..5272b4a --- /dev/null +++ b/dctp/dctp_executor.py @@ -0,0 +1,584 @@ +""" +DCTP Executor - Executes DCTP operations on files. + +Handles CREATE, DELETE, INSERT_AFTER, REPLACE, and RENUMBER operations +with backup support and checksum validation. +""" + +import hashlib +import os +import re +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Optional + +from dctp_parser import DCTPParser, Operation, OperationType +from dctp_backup import BackupManager +from dctp_diff import DiffGenerator, FileDiff + + +class ResultStatus(Enum): + SUCCESS = "success" + WARNING = "warning" + ERROR = "error" + SKIPPED = "skipped" + + +@dataclass +class ExecutionResult: + """Result of executing a single operation.""" + status: ResultStatus + operation: Operation + message: str + diff: Optional[FileDiff] = None + + def __str__(self) -> str: + status_symbols = { + ResultStatus.SUCCESS: "✅", + ResultStatus.WARNING: "⚠️", + ResultStatus.ERROR: "❌", + ResultStatus.SKIPPED: "⏭️", + } + return f"{status_symbols[self.status]} {self.message}" + + +@dataclass +class PreviewResult: + """Result of previewing operations before execution.""" + operation: Operation + description: str + diff: Optional[FileDiff] = None + warnings: list[str] = None + + def __post_init__(self): + if self.warnings is None: + self.warnings = [] + + +class DCTPExecutor: + """Executes DCTP operations on files.""" + + # Line number patterns (same as parser) + LINE_NUMBER_PATTERNS = [ + re.compile(r'\s*#Z(\d+)\s*$'), + re.compile(r'\s*//Z(\d+)\s*$'), + re.compile(r'\s*\s*$'), + re.compile(r'\s*/\*Z(\d+)\*/\s*$'), + re.compile(r'\s*--Z(\d+)\s*$'), + ] + + def __init__( + self, + project_path: str, + backup_manager: Optional[BackupManager] = None, + auto_renumber: bool = True, + validate_checksums: bool = True + ): + """ + Initialize the executor. + + Args: + project_path: Base project directory + backup_manager: Optional backup manager for undo support + auto_renumber: Automatically renumber after operations + validate_checksums: Validate checksums before operations + """ + self.project_path = Path(project_path) + self.backup_manager = backup_manager or BackupManager(project_path) + self.auto_renumber = auto_renumber + self.validate_checksums = validate_checksums + self.diff_generator = DiffGenerator() + self.parser = DCTPParser() + + def preview(self, operations: list[Operation]) -> list[PreviewResult]: + """ + Preview operations without executing them. + + Args: + operations: List of operations to preview + + Returns: + List of preview results + """ + previews = [] + + for op in operations: + preview = self._preview_operation(op) + previews.append(preview) + + return previews + + def _preview_operation(self, op: Operation) -> PreviewResult: + """Generate preview for a single operation.""" + file_path = self.project_path / op.file + warnings = [] + + if op.type == OperationType.NEW: + if file_path.exists(): + warnings.append(f"File already exists and will be overwritten") + return PreviewResult( + operation=op, + description=f"CREATE {op.file} ({len(op.content)} lines)", + warnings=warnings + ) + + elif op.type == OperationType.DELETE: + if not file_path.exists(): + warnings.append(f"File does not exist") + return PreviewResult( + operation=op, + description=f"DELETE {op.file} Z{op.start_line}-Z{op.end_line} (file not found)", + warnings=warnings + ) + + old_lines = self._read_file_lines(file_path) + if op.end_line > len(old_lines): + warnings.append(f"Line range exceeds file length ({len(old_lines)} lines)") + + new_lines = old_lines.copy() + start_idx = op.start_line - 1 + end_idx = min(op.end_line, len(old_lines)) + del new_lines[start_idx:end_idx] + + diff = self.diff_generator.generate(old_lines, new_lines, op.file) + return PreviewResult( + operation=op, + description=f"DELETE {op.file} Z{op.start_line}-Z{op.end_line}", + diff=diff, + warnings=warnings + ) + + elif op.type == OperationType.INSERT_AFTER: + if not file_path.exists(): + warnings.append(f"File does not exist") + return PreviewResult( + operation=op, + description=f"INSERT_AFTER {op.file} Z{op.start_line} (file not found)", + warnings=warnings + ) + + old_lines = self._read_file_lines(file_path) + if op.start_line > len(old_lines): + warnings.append(f"Line {op.start_line} exceeds file length ({len(old_lines)} lines)") + + new_lines = old_lines.copy() + insert_idx = min(op.start_line, len(old_lines)) + for i, line in enumerate(op.content): + new_lines.insert(insert_idx + i, line) + + diff = self.diff_generator.generate(old_lines, new_lines, op.file) + return PreviewResult( + operation=op, + description=f"INSERT_AFTER {op.file} Z{op.start_line} ({len(op.content)} lines)", + diff=diff, + warnings=warnings + ) + + elif op.type == OperationType.REPLACE: + if not file_path.exists(): + warnings.append(f"File does not exist") + return PreviewResult( + operation=op, + description=f"REPLACE {op.file} Z{op.start_line}-Z{op.end_line} (file not found)", + warnings=warnings + ) + + old_lines = self._read_file_lines(file_path) + if op.end_line > len(old_lines): + warnings.append(f"Line range exceeds file length ({len(old_lines)} lines)") + + new_lines = old_lines.copy() + start_idx = op.start_line - 1 + end_idx = min(op.end_line, len(old_lines)) + new_lines[start_idx:end_idx] = op.content + + diff = self.diff_generator.generate(old_lines, new_lines, op.file) + return PreviewResult( + operation=op, + description=f"REPLACE {op.file} Z{op.start_line}-Z{op.end_line} ({len(op.content)} lines)", + diff=diff, + warnings=warnings + ) + + elif op.type == OperationType.RENUMBER: + return PreviewResult( + operation=op, + description=f"RENUMBER {op.file}", + warnings=warnings + ) + + return PreviewResult( + operation=op, + description=f"UNKNOWN {op.type}", + warnings=["Unknown operation type"] + ) + + def execute( + self, + operations: list[Operation], + skip_checksum_mismatch: bool = False + ) -> list[ExecutionResult]: + """ + Execute a list of operations. + + Args: + operations: List of operations to execute + skip_checksum_mismatch: Continue even if checksums don't match + + Returns: + List of execution results + """ + results = [] + + # Start backup session + self.backup_manager.start_session() + + try: + for op in operations: + result = self._execute_operation(op, skip_checksum_mismatch) + results.append(result) + + # Stop on error + if result.status == ResultStatus.ERROR: + break + + finally: + # End backup session + self.backup_manager.end_session() + + return results + + def _execute_operation( + self, + op: Operation, + skip_checksum_mismatch: bool = False + ) -> ExecutionResult: + """Execute a single operation.""" + file_path = self.project_path / op.file + + try: + if op.type == OperationType.NEW: + return self._execute_new(op, file_path) + elif op.type == OperationType.DELETE: + return self._execute_delete(op, file_path, skip_checksum_mismatch) + elif op.type == OperationType.INSERT_AFTER: + return self._execute_insert_after(op, file_path, skip_checksum_mismatch) + elif op.type == OperationType.REPLACE: + return self._execute_replace(op, file_path, skip_checksum_mismatch) + elif op.type == OperationType.RENUMBER: + return self._execute_renumber(op, file_path) + else: + return ExecutionResult( + status=ResultStatus.ERROR, + operation=op, + message=f"Unknown operation type: {op.type}" + ) + except PermissionError: + return ExecutionResult( + status=ResultStatus.ERROR, + operation=op, + message=f"Permission denied: {file_path}" + ) + except Exception as e: + return ExecutionResult( + status=ResultStatus.ERROR, + operation=op, + message=f"Error: {str(e)}" + ) + + def _execute_new(self, op: Operation, file_path: Path) -> ExecutionResult: + """Execute a NEW operation (create file).""" + # Backup if file exists + if file_path.exists(): + self.backup_manager.backup(str(op.file)) + + # Create parent directories + file_path.parent.mkdir(parents=True, exist_ok=True) + + # Write content + content = "\n".join(op.content) + if op.content and not content.endswith("\n"): + content += "\n" + file_path.write_text(content) + + return ExecutionResult( + status=ResultStatus.SUCCESS, + operation=op, + message=f"CREATE {op.file} ({len(op.content)} lines)" + ) + + def _execute_delete( + self, + op: Operation, + file_path: Path, + skip_checksum_mismatch: bool + ) -> ExecutionResult: + """Execute a DELETE operation.""" + if not file_path.exists(): + return ExecutionResult( + status=ResultStatus.WARNING, + operation=op, + message=f"File not found: {op.file}" + ) + + # Validate checksum if provided + if op.checksum and self.validate_checksums: + if not self._validate_checksum(file_path, op.checksum): + if not skip_checksum_mismatch: + return ExecutionResult( + status=ResultStatus.WARNING, + operation=op, + message=f"Checksum mismatch for {op.file} - file was modified externally" + ) + + # Backup file + self.backup_manager.backup(str(op.file)) + + # Read file and delete lines + lines = self._read_file_lines(file_path) + old_lines = lines.copy() + + if op.end_line > len(lines): + return ExecutionResult( + status=ResultStatus.WARNING, + operation=op, + message=f"Line range Z{op.start_line}-Z{op.end_line} exceeds file length ({len(lines)} lines)" + ) + + start_idx = op.start_line - 1 + end_idx = op.end_line + del lines[start_idx:end_idx] + + # Write back + self._write_file_lines(file_path, lines) + + diff = self.diff_generator.generate(old_lines, lines, op.file) + + return ExecutionResult( + status=ResultStatus.SUCCESS, + operation=op, + message=f"DELETE {op.file} Z{op.start_line}-Z{op.end_line}", + diff=diff + ) + + def _execute_insert_after( + self, + op: Operation, + file_path: Path, + skip_checksum_mismatch: bool + ) -> ExecutionResult: + """Execute an INSERT_AFTER operation.""" + if not file_path.exists(): + return ExecutionResult( + status=ResultStatus.WARNING, + operation=op, + message=f"File not found: {op.file}" + ) + + # Validate checksum if provided + if op.checksum and self.validate_checksums: + if not self._validate_checksum(file_path, op.checksum): + if not skip_checksum_mismatch: + return ExecutionResult( + status=ResultStatus.WARNING, + operation=op, + message=f"Checksum mismatch for {op.file} - file was modified externally" + ) + + # Backup file + self.backup_manager.backup(str(op.file)) + + # Read file and insert lines + lines = self._read_file_lines(file_path) + old_lines = lines.copy() + + if op.start_line > len(lines): + return ExecutionResult( + status=ResultStatus.WARNING, + operation=op, + message=f"Line Z{op.start_line} exceeds file length ({len(lines)} lines)" + ) + + insert_idx = op.start_line + for i, line in enumerate(op.content): + lines.insert(insert_idx + i, line) + + # Write back + self._write_file_lines(file_path, lines) + + diff = self.diff_generator.generate(old_lines, lines, op.file) + + return ExecutionResult( + status=ResultStatus.SUCCESS, + operation=op, + message=f"INSERT_AFTER {op.file} Z{op.start_line} ({len(op.content)} lines)", + diff=diff + ) + + def _execute_replace( + self, + op: Operation, + file_path: Path, + skip_checksum_mismatch: bool + ) -> ExecutionResult: + """Execute a REPLACE operation.""" + if not file_path.exists(): + return ExecutionResult( + status=ResultStatus.WARNING, + operation=op, + message=f"File not found: {op.file}" + ) + + # Validate checksum if provided + if op.checksum and self.validate_checksums: + if not self._validate_checksum(file_path, op.checksum): + if not skip_checksum_mismatch: + return ExecutionResult( + status=ResultStatus.WARNING, + operation=op, + message=f"Checksum mismatch for {op.file} - file was modified externally" + ) + + # Backup file + self.backup_manager.backup(str(op.file)) + + # Read file and replace lines + lines = self._read_file_lines(file_path) + old_lines = lines.copy() + + if op.end_line > len(lines): + return ExecutionResult( + status=ResultStatus.WARNING, + operation=op, + message=f"Line range Z{op.start_line}-Z{op.end_line} exceeds file length ({len(lines)} lines)" + ) + + start_idx = op.start_line - 1 + end_idx = op.end_line + lines[start_idx:end_idx] = op.content + + # Write back + self._write_file_lines(file_path, lines) + + diff = self.diff_generator.generate(old_lines, lines, op.file) + + return ExecutionResult( + status=ResultStatus.SUCCESS, + operation=op, + message=f"REPLACE {op.file} Z{op.start_line}-Z{op.end_line} ({len(op.content)} lines)", + diff=diff + ) + + def _execute_renumber(self, op: Operation, file_path: Path) -> ExecutionResult: + """Execute a RENUMBER operation.""" + if not file_path.exists(): + return ExecutionResult( + status=ResultStatus.WARNING, + operation=op, + message=f"File not found: {op.file}" + ) + + # No backup needed for renumber (just updates line numbers) + lines = self._read_file_lines(file_path) + renumbered_lines = [] + + for i, line in enumerate(lines, 1): + # Remove existing line number + clean_line = self._remove_line_number(line) + # Add new line number + suffix = self.parser.get_line_number_suffix(op.file, i) + renumbered_lines.append(clean_line + suffix) + + self._write_file_lines(file_path, renumbered_lines) + + return ExecutionResult( + status=ResultStatus.SUCCESS, + operation=op, + message=f"RENUMBER {op.file} ({len(lines)} lines)" + ) + + def _read_file_lines(self, file_path: Path) -> list[str]: + """Read file and return lines without trailing newlines.""" + content = file_path.read_text() + lines = content.split('\n') + # Remove trailing empty line if file ends with newline + if lines and lines[-1] == '': + lines = lines[:-1] + return lines + + def _write_file_lines(self, file_path: Path, lines: list[str]) -> None: + """Write lines to file with trailing newline.""" + content = '\n'.join(lines) + if lines and not content.endswith('\n'): + content += '\n' + file_path.write_text(content) + + def _remove_line_number(self, line: str) -> str: + """Remove line number marker from end of line.""" + for pattern in self.LINE_NUMBER_PATTERNS: + match = pattern.search(line) + if match: + return line[:match.start()] + return line + + def _validate_checksum(self, file_path: Path, expected: str) -> bool: + """Validate file checksum.""" + content = file_path.read_bytes() + actual = hashlib.md5(content).hexdigest()[:8] + return actual.lower() == expected.lower() + + @staticmethod + def calculate_checksum(file_path: Path) -> str: + """Calculate checksum for a file.""" + content = file_path.read_bytes() + return hashlib.md5(content).hexdigest()[:8] + + +def main(): + """Test the executor.""" + import tempfile + + test_input = """###FILE:calculator.py +###NEW +def add(a, b): #Z1 + return a + b #Z2 + #Z3 +def multiply(a, b): #Z4 + return a * b #Z5 +###END +""" + + with tempfile.TemporaryDirectory() as tmpdir: + parser = DCTPParser() + result = parser.parse(test_input) + + print("Parsed operations:") + for op in result.operations: + print(f" {op}") + + executor = DCTPExecutor(tmpdir) + + # Preview + print("\nPreviews:") + previews = executor.preview(result.operations) + for preview in previews: + print(f" {preview.description}") + if preview.warnings: + for w in preview.warnings: + print(f" ⚠️ {w}") + + # Execute + print("\nExecution:") + exec_results = executor.execute(result.operations) + for r in exec_results: + print(f" {r}") + + # Verify file was created + file_path = Path(tmpdir) / "calculator.py" + if file_path.exists(): + print(f"\nFile content:\n{file_path.read_text()}") + + +if __name__ == "__main__": + main() diff --git a/dctp/dctp_gui.py b/dctp/dctp_gui.py new file mode 100644 index 0000000..5861013 --- /dev/null +++ b/dctp/dctp_gui.py @@ -0,0 +1,666 @@ +#!/usr/bin/env python3 +""" +DCTP GUI - Delta Code Transfer Protocol graphical user interface. + +A CustomTkinter-based GUI for managing AI-generated code transfers +using delta operations for efficient updates. +""" + +import os +import sys +from datetime import datetime +from pathlib import Path +from tkinter import filedialog, messagebox +from typing import Optional +import json + +import customtkinter as ctk + +from dctp_parser import DCTPParser, ParseResult, Operation, OperationType +from dctp_executor import DCTPExecutor, ExecutionResult, PreviewResult, ResultStatus +from dctp_backup import BackupManager +from dctp_diff import DiffType + + +class SettingsDialog(ctk.CTkToplevel): + """Settings dialog window.""" + + def __init__(self, parent, settings: dict): + super().__init__(parent) + + self.title("Einstellungen") + self.geometry("500x400") + self.resizable(False, False) + + self.settings = settings.copy() + self.result = None + + # Make modal + self.transient(parent) + self.grab_set() + + self._create_widgets() + + # Center on parent + self.update_idletasks() + x = parent.winfo_x() + (parent.winfo_width() - self.winfo_width()) // 2 + y = parent.winfo_y() + (parent.winfo_height() - self.winfo_height()) // 2 + self.geometry(f"+{x}+{y}") + + def _create_widgets(self): + # Main frame + main_frame = ctk.CTkFrame(self) + main_frame.pack(fill="both", expand=True, padx=20, pady=20) + + # Project path + ctk.CTkLabel(main_frame, text="Standard-Projektpfad:").pack(anchor="w", pady=(0, 5)) + path_frame = ctk.CTkFrame(main_frame) + path_frame.pack(fill="x", pady=(0, 15)) + + self.path_entry = ctk.CTkEntry(path_frame, width=350) + self.path_entry.pack(side="left", fill="x", expand=True, padx=(0, 10)) + self.path_entry.insert(0, self.settings.get("project_path", "")) + + ctk.CTkButton( + path_frame, + text="...", + width=40, + command=self._browse_path + ).pack(side="right") + + # Backup directory + ctk.CTkLabel(main_frame, text="Backup-Verzeichnis:").pack(anchor="w", pady=(0, 5)) + backup_frame = ctk.CTkFrame(main_frame) + backup_frame.pack(fill="x", pady=(0, 15)) + + self.backup_entry = ctk.CTkEntry(backup_frame, width=350) + self.backup_entry.pack(side="left", fill="x", expand=True, padx=(0, 10)) + self.backup_entry.insert(0, self.settings.get("backup_dir", ".dctp_backups")) + + ctk.CTkButton( + backup_frame, + text="...", + width=40, + command=self._browse_backup + ).pack(side="right") + + # Options + options_frame = ctk.CTkFrame(main_frame) + options_frame.pack(fill="x", pady=15) + + self.auto_renumber_var = ctk.BooleanVar( + value=self.settings.get("auto_renumber", True) + ) + ctk.CTkCheckBox( + options_frame, + text="Auto-Renumber nach Operationen", + variable=self.auto_renumber_var + ).pack(anchor="w", pady=5) + + self.validate_checksum_var = ctk.BooleanVar( + value=self.settings.get("validate_checksum", True) + ) + ctk.CTkCheckBox( + options_frame, + text="Checksum-Validierung aktiviert", + variable=self.validate_checksum_var + ).pack(anchor="w", pady=5) + + # Theme + ctk.CTkLabel(main_frame, text="Theme:").pack(anchor="w", pady=(15, 5)) + self.theme_var = ctk.StringVar(value=self.settings.get("theme", "dark")) + theme_frame = ctk.CTkFrame(main_frame) + theme_frame.pack(fill="x", pady=(0, 15)) + + ctk.CTkRadioButton( + theme_frame, + text="Dunkel", + variable=self.theme_var, + value="dark" + ).pack(side="left", padx=(0, 20)) + + ctk.CTkRadioButton( + theme_frame, + text="Hell", + variable=self.theme_var, + value="light" + ).pack(side="left") + + # Buttons + button_frame = ctk.CTkFrame(main_frame) + button_frame.pack(fill="x", pady=(20, 0)) + + ctk.CTkButton( + button_frame, + text="Abbrechen", + command=self._cancel + ).pack(side="right", padx=(10, 0)) + + ctk.CTkButton( + button_frame, + text="Speichern", + command=self._save + ).pack(side="right") + + def _browse_path(self): + path = filedialog.askdirectory( + initialdir=self.path_entry.get() or os.path.expanduser("~") + ) + if path: + self.path_entry.delete(0, "end") + self.path_entry.insert(0, path) + + def _browse_backup(self): + path = filedialog.askdirectory( + initialdir=os.path.expanduser("~") + ) + if path: + self.backup_entry.delete(0, "end") + self.backup_entry.insert(0, path) + + def _save(self): + self.result = { + "project_path": self.path_entry.get(), + "backup_dir": self.backup_entry.get(), + "auto_renumber": self.auto_renumber_var.get(), + "validate_checksum": self.validate_checksum_var.get(), + "theme": self.theme_var.get() + } + self.destroy() + + def _cancel(self): + self.result = None + self.destroy() + + +class DCTPApp(ctk.CTk): + """Main DCTP application window.""" + + SETTINGS_FILE = ".dctp_settings.json" + + def __init__(self): + super().__init__() + + self.title("DCTP - Delta Code Transfer") + self.geometry("1200x900") + self.minsize(800, 600) + + # Initialize components + self.parser = DCTPParser() + self.executor: Optional[DCTPExecutor] = None + self.backup_manager: Optional[BackupManager] = None + self.current_operations: list[Operation] = [] + self.current_previews: list[PreviewResult] = [] + + # Load settings + self.settings = self._load_settings() + ctk.set_appearance_mode(self.settings.get("theme", "dark")) + + # Create UI + self._create_widgets() + + # Initialize project if path is set + if self.settings.get("project_path"): + self._init_project(self.settings["project_path"]) + + self._log("Bereit") + + def _load_settings(self) -> dict: + """Load settings from file.""" + settings_path = Path.home() / self.SETTINGS_FILE + if settings_path.exists(): + try: + return json.loads(settings_path.read_text()) + except (json.JSONDecodeError, IOError): + pass + return { + "project_path": "", + "backup_dir": ".dctp_backups", + "auto_renumber": True, + "validate_checksum": True, + "theme": "dark" + } + + def _save_settings(self): + """Save settings to file.""" + settings_path = Path.home() / self.SETTINGS_FILE + settings_path.write_text(json.dumps(self.settings, indent=2)) + + def _create_widgets(self): + """Create all UI widgets.""" + # Top bar - project selection + top_frame = ctk.CTkFrame(self) + top_frame.pack(fill="x", padx=10, pady=10) + + ctk.CTkLabel(top_frame, text="Projekt:").pack(side="left", padx=(0, 10)) + + self.project_entry = ctk.CTkEntry(top_frame, width=400) + self.project_entry.pack(side="left", fill="x", expand=True, padx=(0, 10)) + self.project_entry.insert(0, self.settings.get("project_path", "")) + + ctk.CTkButton( + top_frame, + text="Waehlen", + width=100, + command=self._browse_project + ).pack(side="left", padx=(0, 10)) + + ctk.CTkButton( + top_frame, + text="Einstellungen", + width=100, + command=self._open_settings + ).pack(side="left") + + # Main content area with paned layout + content_frame = ctk.CTkFrame(self) + content_frame.pack(fill="both", expand=True, padx=10, pady=(0, 10)) + + # Left side - Input and preview + left_frame = ctk.CTkFrame(content_frame) + left_frame.pack(side="left", fill="both", expand=True, padx=(0, 5)) + + # Input area + input_label_frame = ctk.CTkFrame(left_frame) + input_label_frame.pack(fill="x", pady=(5, 5), padx=5) + ctk.CTkLabel( + input_label_frame, + text="Input (KI-Output hier einfuegen)", + font=ctk.CTkFont(weight="bold") + ).pack(side="left") + + self.input_text = ctk.CTkTextbox( + left_frame, + height=250, + font=ctk.CTkFont(family="Consolas", size=12) + ) + self.input_text.pack(fill="both", expand=True, padx=5, pady=(0, 10)) + + # Buttons + button_frame = ctk.CTkFrame(left_frame) + button_frame.pack(fill="x", padx=5, pady=(0, 10)) + + self.analyze_btn = ctk.CTkButton( + button_frame, + text="Analysieren", + command=self._analyze, + width=120 + ) + self.analyze_btn.pack(side="left", padx=(0, 10)) + + self.execute_btn = ctk.CTkButton( + button_frame, + text="Ausfuehren", + command=self._execute, + width=120, + state="disabled" + ) + self.execute_btn.pack(side="left", padx=(0, 10)) + + self.undo_btn = ctk.CTkButton( + button_frame, + text="Undo", + command=self._undo, + width=80 + ) + self.undo_btn.pack(side="left", padx=(0, 10)) + + ctk.CTkButton( + button_frame, + text="Clear", + command=self._clear, + width=80 + ).pack(side="left") + + # Preview operations + preview_label_frame = ctk.CTkFrame(left_frame) + preview_label_frame.pack(fill="x", pady=(5, 5), padx=5) + ctk.CTkLabel( + preview_label_frame, + text="Vorschau Operationen", + font=ctk.CTkFont(weight="bold") + ).pack(side="left") + + self.preview_text = ctk.CTkTextbox( + left_frame, + height=150, + font=ctk.CTkFont(family="Consolas", size=11) + ) + self.preview_text.pack(fill="both", expand=True, padx=5, pady=(0, 10)) + self.preview_text.configure(state="disabled") + + # Right side - Diff and file tree + right_frame = ctk.CTkFrame(content_frame) + right_frame.pack(side="right", fill="both", expand=True, padx=(5, 0)) + + # Diff view + diff_label_frame = ctk.CTkFrame(right_frame) + diff_label_frame.pack(fill="x", pady=(5, 5), padx=5) + ctk.CTkLabel( + diff_label_frame, + text="Diff-Ansicht", + font=ctk.CTkFont(weight="bold") + ).pack(side="left") + + self.diff_text = ctk.CTkTextbox( + right_frame, + height=300, + font=ctk.CTkFont(family="Consolas", size=11) + ) + self.diff_text.pack(fill="both", expand=True, padx=5, pady=(0, 10)) + self.diff_text.configure(state="disabled") + + # File tree + tree_label_frame = ctk.CTkFrame(right_frame) + tree_label_frame.pack(fill="x", pady=(5, 5), padx=5) + ctk.CTkLabel( + tree_label_frame, + text="Projektdateien", + font=ctk.CTkFont(weight="bold") + ).pack(side="left") + + ctk.CTkButton( + tree_label_frame, + text="Aktualisieren", + width=80, + command=self._refresh_file_tree + ).pack(side="right") + + self.tree_text = ctk.CTkTextbox( + right_frame, + height=150, + font=ctk.CTkFont(family="Consolas", size=11) + ) + self.tree_text.pack(fill="both", expand=True, padx=5, pady=(0, 10)) + self.tree_text.configure(state="disabled") + + # Bottom - Log + log_label_frame = ctk.CTkFrame(self) + log_label_frame.pack(fill="x", padx=10, pady=(0, 5)) + ctk.CTkLabel( + log_label_frame, + text="Log", + font=ctk.CTkFont(weight="bold") + ).pack(side="left") + + self.log_text = ctk.CTkTextbox( + self, + height=120, + font=ctk.CTkFont(family="Consolas", size=10) + ) + self.log_text.pack(fill="x", padx=10, pady=(0, 10)) + self.log_text.configure(state="disabled") + + def _log(self, message: str, level: str = "info"): + """Add a message to the log.""" + timestamp = datetime.now().strftime("%H:%M:%S") + prefix = "" + if level == "error": + prefix = "ERROR " + elif level == "warning": + prefix = "WARN " + + self.log_text.configure(state="normal") + self.log_text.insert("end", f"{timestamp} {prefix}{message}\n") + self.log_text.see("end") + self.log_text.configure(state="disabled") + + def _init_project(self, path: str): + """Initialize project with given path.""" + if not os.path.isdir(path): + self._log(f"Verzeichnis existiert nicht: {path}", "error") + return False + + self.backup_manager = BackupManager(path) + self.executor = DCTPExecutor( + path, + self.backup_manager, + auto_renumber=self.settings.get("auto_renumber", True), + validate_checksums=self.settings.get("validate_checksum", True) + ) + self.settings["project_path"] = path + self._save_settings() + self._log(f"Projekt geladen: {path}") + self._refresh_file_tree() + return True + + def _browse_project(self): + """Open directory browser for project selection.""" + initial_dir = self.project_entry.get() or os.path.expanduser("~") + path = filedialog.askdirectory(initialdir=initial_dir) + if path: + self.project_entry.delete(0, "end") + self.project_entry.insert(0, path) + self._init_project(path) + + def _open_settings(self): + """Open settings dialog.""" + dialog = SettingsDialog(self, self.settings) + self.wait_window(dialog) + + if dialog.result: + old_theme = self.settings.get("theme") + self.settings.update(dialog.result) + self._save_settings() + + # Apply theme change + if dialog.result.get("theme") != old_theme: + ctk.set_appearance_mode(dialog.result["theme"]) + + # Reinitialize project with new settings + if self.settings.get("project_path"): + self._init_project(self.settings["project_path"]) + + self._log("Einstellungen gespeichert") + + def _analyze(self): + """Analyze input and show preview.""" + # Ensure project is initialized + project_path = self.project_entry.get() + if not project_path: + messagebox.showerror("Fehler", "Bitte waehle ein Projektverzeichnis") + return + + if not self.executor or self.settings.get("project_path") != project_path: + if not self._init_project(project_path): + return + + # Get input + input_text = self.input_text.get("1.0", "end-1c") + if not input_text.strip(): + self._log("Kein Input vorhanden", "warning") + return + + # Parse + self._log("Analysiere...") + result = self.parser.parse(input_text) + + # Handle errors + if result.has_errors: + self._log(f"{len(result.errors)} Parse-Fehler gefunden", "error") + for error in result.errors: + self._log(f" Zeile {error.line_number}: {error.message}", "error") + return + + if not result.operations: + self._log("Keine Operationen gefunden", "warning") + return + + self.current_operations = result.operations + self._log(f"{len(result.operations)} Operationen gefunden") + + # Generate previews + self.current_previews = self.executor.preview(result.operations) + + # Display previews + self._display_previews() + + # Enable execute button + self.execute_btn.configure(state="normal") + + def _display_previews(self): + """Display operation previews.""" + self.preview_text.configure(state="normal") + self.preview_text.delete("1.0", "end") + + self.diff_text.configure(state="normal") + self.diff_text.delete("1.0", "end") + + for preview in self.current_previews: + # Add to preview list + self.preview_text.insert("end", f"{preview.description}\n") + for warning in preview.warnings: + self.preview_text.insert("end", f" WARNING {warning}\n") + + # Add diff if available + if preview.diff and preview.diff.has_changes: + self.diff_text.insert("end", f"--- {preview.diff.filename} ---\n") + for line in preview.diff.lines: + if line.type == DiffType.ADDED: + self.diff_text.insert("end", f"+ {line.content}\n") + elif line.type == DiffType.REMOVED: + self.diff_text.insert("end", f"- {line.content}\n") + elif line.type == DiffType.UNCHANGED: + self.diff_text.insert("end", f" {line.content}\n") + self.diff_text.insert("end", "\n") + + self.preview_text.configure(state="disabled") + self.diff_text.configure(state="disabled") + + def _execute(self): + """Execute the analyzed operations.""" + if not self.current_operations: + self._log("Keine Operationen zum Ausfuehren", "warning") + return + + if not self.executor: + self._log("Kein Projekt initialisiert", "error") + return + + # Confirm + count = len(self.current_operations) + if not messagebox.askyesno( + "Bestaetigen", + f"{count} Operationen ausfuehren?" + ): + return + + self._log(f"Fuehre {count} Operationen aus...") + + # Execute + results = self.executor.execute(self.current_operations) + + # Log results + success_count = 0 + for result in results: + if result.status == ResultStatus.SUCCESS: + self._log(f"OK {result.message}") + success_count += 1 + elif result.status == ResultStatus.WARNING: + self._log(f"WARN {result.message}", "warning") + elif result.status == ResultStatus.ERROR: + self._log(f"ERROR {result.message}", "error") + + self._log(f"Abgeschlossen: {success_count}/{count} erfolgreich") + + # Clear current operations + self.current_operations = [] + self.current_previews = [] + self.execute_btn.configure(state="disabled") + + # Refresh file tree + self._refresh_file_tree() + + def _undo(self): + """Undo last operation.""" + if not self.backup_manager: + self._log("Kein Projekt initialisiert", "error") + return + + success, restored = self.backup_manager.restore_last() + + if success: + self._log(f"Undo erfolgreich: {len(restored)} Dateien wiederhergestellt") + for f in restored: + self._log(f" -> {f}") + self._refresh_file_tree() + else: + self._log("Kein Backup zum Wiederherstellen", "warning") + + def _clear(self): + """Clear input and preview areas.""" + self.input_text.delete("1.0", "end") + + self.preview_text.configure(state="normal") + self.preview_text.delete("1.0", "end") + self.preview_text.configure(state="disabled") + + self.diff_text.configure(state="normal") + self.diff_text.delete("1.0", "end") + self.diff_text.configure(state="disabled") + + self.current_operations = [] + self.current_previews = [] + self.execute_btn.configure(state="disabled") + + self._log("Eingabe geloescht") + + def _refresh_file_tree(self): + """Refresh the file tree display.""" + self.tree_text.configure(state="normal") + self.tree_text.delete("1.0", "end") + + project_path = self.project_entry.get() + if not project_path or not os.path.isdir(project_path): + self.tree_text.insert("end", "(Kein Projekt geladen)") + self.tree_text.configure(state="disabled") + return + + # Build simple tree + try: + self._add_tree_items(Path(project_path), 0) + except Exception as e: + self.tree_text.insert("end", f"Fehler: {e}") + + self.tree_text.configure(state="disabled") + + def _add_tree_items(self, path: Path, level: int, max_items: int = 100): + """Recursively add items to tree display.""" + if level > 5: # Limit depth + return + + indent = " " * level + + try: + items = sorted(path.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())) + count = 0 + + for item in items: + if count >= max_items: + self.tree_text.insert("end", f"{indent} ... (mehr Dateien)\n") + break + + # Skip hidden files and backup directory + if item.name.startswith('.'): + continue + + if item.is_dir(): + self.tree_text.insert("end", f"{indent}DIR {item.name}/\n") + self._add_tree_items(item, level + 1, max_items=20) + else: + self.tree_text.insert("end", f"{indent}FILE {item.name}\n") + + count += 1 + + except PermissionError: + self.tree_text.insert("end", f"{indent} (Zugriff verweigert)\n") + + +def main(): + """Main entry point.""" + app = DCTPApp() + app.mainloop() + + +if __name__ == "__main__": + main() diff --git a/dctp/dctp_parser.py b/dctp/dctp_parser.py new file mode 100644 index 0000000..f6bbed1 --- /dev/null +++ b/dctp/dctp_parser.py @@ -0,0 +1,307 @@ +""" +DCTP Parser - Parses DCTP control commands and code blocks. + +Handles line-numbered code with language-specific comment formats: +- Python/Shell: #Z1 +- JavaScript/Java/C/C++: //Z1 +- HTML: +- CSS: /*Z1*/ +- SQL: --Z1 +""" + +import re +from dataclasses import dataclass, field +from typing import Optional +from enum import Enum + + +class OperationType(Enum): + NEW = "NEW" + DELETE = "DELETE" + INSERT_AFTER = "INSERT_AFTER" + REPLACE = "REPLACE" + RENUMBER = "RENUMBER" + + +@dataclass +class Operation: + """Represents a single DCTP operation.""" + type: OperationType + file: str + start_line: Optional[int] = None + end_line: Optional[int] = None + content: list[str] = field(default_factory=list) + checksum: Optional[str] = None + raw_content: list[str] = field(default_factory=list) # Content with line numbers + + def __str__(self) -> str: + if self.type == OperationType.NEW: + return f"CREATE {self.file} ({len(self.content)} lines)" + elif self.type == OperationType.DELETE: + return f"DELETE {self.file} Z{self.start_line}-Z{self.end_line}" + elif self.type == OperationType.INSERT_AFTER: + return f"INSERT_AFTER {self.file} Z{self.start_line} ({len(self.content)} lines)" + elif self.type == OperationType.REPLACE: + return f"REPLACE {self.file} Z{self.start_line}-Z{self.end_line} ({len(self.content)} lines)" + elif self.type == OperationType.RENUMBER: + return f"RENUMBER {self.file}" + return f"{self.type.value} {self.file}" + + +@dataclass +class ParseError: + """Represents a parsing error.""" + line_number: int + line_content: str + message: str + + +@dataclass +class ParseResult: + """Result of parsing DCTP input.""" + operations: list[Operation] + errors: list[ParseError] + + @property + def has_errors(self) -> bool: + return len(self.errors) > 0 + + +class DCTPParser: + """Parser for DCTP (Delta Code Transfer Protocol) format.""" + + # Regex patterns for line number markers in different languages + LINE_NUMBER_PATTERNS = [ + re.compile(r'\s*#Z(\d+)\s*$'), # Python, Shell + re.compile(r'\s*//Z(\d+)\s*$'), # JavaScript, Java, C, C++ + re.compile(r'\s*\s*$'), # HTML + re.compile(r'\s*/\*Z(\d+)\*/\s*$'), # CSS + re.compile(r'\s*--Z(\d+)\s*$'), # SQL + ] + + # Control command patterns + FILE_PATTERN = re.compile(r'^###FILE:(.+)$') + NEW_PATTERN = re.compile(r'^###NEW\s*$') + DELETE_PATTERN = re.compile(r'^###DELETE:Z(\d+)(?:-Z(\d+))?\s*$') + INSERT_AFTER_PATTERN = re.compile(r'^###INSERT_AFTER:Z(\d+)\s*$') + REPLACE_PATTERN = re.compile(r'^###REPLACE:Z(\d+)(?:-Z(\d+))?\s*$') + END_PATTERN = re.compile(r'^###END\s*$') + RENUMBER_PATTERN = re.compile(r'^###RENUMBER\s*$') + CHECKSUM_PATTERN = re.compile(r'^###CHECKSUM:([a-fA-F0-9]+)\s*$') + + def parse(self, text: str) -> ParseResult: + """ + Parse DCTP formatted text into a list of operations. + + Args: + text: The DCTP formatted input text + + Returns: + ParseResult containing operations and any errors + """ + operations: list[Operation] = [] + errors: list[ParseError] = [] + + current_file: Optional[str] = None + current_op: Optional[Operation] = None + buffer: list[str] = [] + raw_buffer: list[str] = [] + + lines = text.split('\n') + + for line_num, line in enumerate(lines, 1): + # Skip empty lines outside of content blocks + if not line.strip() and current_op is None: + continue + + # Check for FILE command + file_match = self.FILE_PATTERN.match(line) + if file_match: + current_file = file_match.group(1).strip() + continue + + # Check for NEW command + if self.NEW_PATTERN.match(line): + if current_file is None: + errors.append(ParseError(line_num, line, "###NEW without ###FILE")) + continue + current_op = Operation(type=OperationType.NEW, file=current_file) + buffer = [] + raw_buffer = [] + continue + + # Check for DELETE command + delete_match = self.DELETE_PATTERN.match(line) + if delete_match: + if current_file is None: + errors.append(ParseError(line_num, line, "###DELETE without ###FILE")) + continue + start = int(delete_match.group(1)) + end = int(delete_match.group(2)) if delete_match.group(2) else start + operations.append(Operation( + type=OperationType.DELETE, + file=current_file, + start_line=start, + end_line=end + )) + continue + + # Check for INSERT_AFTER command + insert_match = self.INSERT_AFTER_PATTERN.match(line) + if insert_match: + if current_file is None: + errors.append(ParseError(line_num, line, "###INSERT_AFTER without ###FILE")) + continue + current_op = Operation( + type=OperationType.INSERT_AFTER, + file=current_file, + start_line=int(insert_match.group(1)) + ) + buffer = [] + raw_buffer = [] + continue + + # Check for REPLACE command + replace_match = self.REPLACE_PATTERN.match(line) + if replace_match: + if current_file is None: + errors.append(ParseError(line_num, line, "###REPLACE without ###FILE")) + continue + start = int(replace_match.group(1)) + end = int(replace_match.group(2)) if replace_match.group(2) else start + current_op = Operation( + type=OperationType.REPLACE, + file=current_file, + start_line=start, + end_line=end + ) + buffer = [] + raw_buffer = [] + continue + + # Check for END command + if self.END_PATTERN.match(line): + if current_op: + current_op.content = buffer.copy() + current_op.raw_content = raw_buffer.copy() + operations.append(current_op) + current_op = None + buffer = [] + raw_buffer = [] + continue + + # Check for RENUMBER command + if self.RENUMBER_PATTERN.match(line): + if current_file is None: + errors.append(ParseError(line_num, line, "###RENUMBER without ###FILE")) + continue + operations.append(Operation(type=OperationType.RENUMBER, file=current_file)) + continue + + # Check for CHECKSUM command + checksum_match = self.CHECKSUM_PATTERN.match(line) + if checksum_match: + if current_op: + current_op.checksum = checksum_match.group(1) + continue + + # Regular code line - add to buffer if we're in an operation + if current_op is not None: + raw_buffer.append(line) + clean_line = self._remove_line_number(line) + buffer.append(clean_line) + + # Handle unclosed operation + if current_op is not None: + errors.append(ParseError( + len(lines), + "", + f"Unclosed operation: {current_op.type.value} for {current_op.file}" + )) + + return ParseResult(operations=operations, errors=errors) + + def _remove_line_number(self, line: str) -> str: + """Remove line number marker from end of line.""" + for pattern in self.LINE_NUMBER_PATTERNS: + match = pattern.search(line) + if match: + return line[:match.start()] + return line + + def extract_line_number(self, line: str) -> Optional[int]: + """Extract line number from a code line.""" + for pattern in self.LINE_NUMBER_PATTERNS: + match = pattern.search(line) + if match: + return int(match.group(1)) + return None + + @staticmethod + def get_line_number_suffix(filename: str, line_num: int) -> str: + """Get the appropriate line number suffix for a file type.""" + ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else '' + + if ext in ('py', 'sh', 'bash', 'zsh', 'yaml', 'yml', 'toml', 'ini', 'conf', 'rb', 'pl'): + return f" #Z{line_num}" + elif ext in ('js', 'ts', 'jsx', 'tsx', 'java', 'c', 'cpp', 'h', 'hpp', 'cs', 'go', 'rs', 'swift', 'kt', 'scala'): + return f" //Z{line_num}" + elif ext in ('html', 'htm', 'xml', 'svg'): + return f" " + elif ext in ('css', 'scss', 'sass', 'less'): + return f" /*Z{line_num}*/" + elif ext in ('sql',): + return f" --Z{line_num}" + else: + # Default to Python style + return f" #Z{line_num}" + + +def main(): + """Test the parser with example input.""" + test_input = """###FILE:src/calculator.py +###NEW +def add(a, b): #Z1 + return a + b #Z2 + #Z3 +def multiply(a, b): #Z4 + return a * b #Z5 +###END + +###FILE:src/calculator.py +###REPLACE:Z4-Z5 +def multiply(a, b): #Z4 + \"\"\"Multipliziert zwei Zahlen.\"\"\" #Z5 + return a * b #Z6 +###END +###RENUMBER + +###FILE:src/calculator.py +###INSERT_AFTER:Z2 + #Z3 +def subtract(a, b): #Z4 + return a - b #Z5 +###END +###RENUMBER + +###FILE:src/calculator.py +###DELETE:Z10-Z15 +###RENUMBER +""" + + parser = DCTPParser() + result = parser.parse(test_input) + + print("Operations found:") + for op in result.operations: + print(f" {op}") + + if result.has_errors: + print("\nErrors:") + for error in result.errors: + print(f" Line {error.line_number}: {error.message}") + print(f" {error.line_content}") + + +if __name__ == "__main__": + main() diff --git a/dctp/requirements.txt b/dctp/requirements.txt new file mode 100644 index 0000000..2dca37e --- /dev/null +++ b/dctp/requirements.txt @@ -0,0 +1 @@ +customtkinter>=5.2.0