Add purchasing full load cache

This commit is contained in:
2026-06-05 12:43:12 +02:00
parent b1bff57370
commit 43250a4abc
12 changed files with 1040 additions and 30 deletions
@@ -233,4 +233,61 @@ CREATE TABLE NavigationMenuItems (
IsSystem INTEGER NOT NULL DEFAULT 1,
SortOrder INTEGER NOT NULL DEFAULT 0
);";
internal static string GetPurchasingEkkoCacheCreateSql() => @"
CREATE TABLE PurchasingEkkoCache (
Ebeln TEXT NOT NULL PRIMARY KEY,
Bedat TEXT NULL,
Aedat TEXT NULL,
Lifnr TEXT NOT NULL DEFAULT '',
Bukrs TEXT NOT NULL DEFAULT '',
Bsart TEXT NOT NULL DEFAULT '',
RawJson TEXT NOT NULL DEFAULT '',
LastLoadedAtUtc TEXT NOT NULL
);";
internal static string GetPurchasingEkpoCacheCreateSql() => @"
CREATE TABLE PurchasingEkpoCache (
Ebeln TEXT NOT NULL,
Ebelp TEXT NOT NULL,
Matnr TEXT NOT NULL DEFAULT '',
Txz01 TEXT NOT NULL DEFAULT '',
Matkl TEXT NOT NULL DEFAULT '',
Menge TEXT NOT NULL DEFAULT '0',
Meins TEXT NOT NULL DEFAULT '',
Netwr TEXT NOT NULL DEFAULT '0',
Loekz TEXT NOT NULL DEFAULT '',
RawJson TEXT NOT NULL DEFAULT '',
LastLoadedAtUtc TEXT NOT NULL,
PRIMARY KEY (Ebeln, Ebelp)
);";
internal static string GetPurchasingEketCacheCreateSql() => @"
CREATE TABLE PurchasingEketCache (
Ebeln TEXT NOT NULL,
Ebelp TEXT NOT NULL,
Etenr TEXT NOT NULL,
Eindt TEXT NULL,
Menge TEXT NOT NULL DEFAULT '0',
Wemng TEXT NOT NULL DEFAULT '0',
RawJson TEXT NOT NULL DEFAULT '',
LastLoadedAtUtc TEXT NOT NULL,
PRIMARY KEY (Ebeln, Ebelp, Etenr)
);";
internal static string GetPurchasingSyncStateCreateSql() => @"
CREATE TABLE PurchasingSyncState (
Id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
Mode TEXT NOT NULL DEFAULT '',
Status TEXT NOT NULL DEFAULT '',
StartedAtUtc TEXT NULL,
CompletedAtUtc TEXT NULL,
FromDate TEXT NULL,
ToDate TEXT NULL,
LastSuccessfulDeltaAtUtc TEXT NULL,
EkkoRows INTEGER NOT NULL DEFAULT 0,
EkpoRows INTEGER NOT NULL DEFAULT 0,
EketRows INTEGER NOT NULL DEFAULT 0,
Message TEXT NOT NULL DEFAULT ''
);";
}
@@ -46,6 +46,7 @@ public class DatabaseSchemaMaintenanceService : IDatabaseSchemaMaintenanceServic
EnsureManualExcelColumnMappingTable(db);
EnsureCentralSalesRecordTable(db);
EnsureNavigationMenuItemTable(db);
EnsurePurchasingCacheTables(db);
AddColumnIfMissing(db, "CentralSalesRecords", "DocumentEntry", "INTEGER NOT NULL DEFAULT 0");
AddColumnIfMissing(db, "CentralSalesRecords", "DocumentCurrency", "TEXT NOT NULL DEFAULT ''");
AddColumnIfMissing(db, "CentralSalesRecords", "DocumentTotalForeignCurrency", "TEXT NOT NULL DEFAULT '0'");
@@ -284,6 +285,34 @@ CREATE TABLE IF NOT EXISTS FieldTransformationRules (
cmd.ExecuteNonQuery();
}
private static void EnsurePurchasingCacheTables(AppDbContext db)
{
var conn = db.Database.GetDbConnection();
if (conn.State != System.Data.ConnectionState.Open)
conn.Open();
foreach (var createSql in new[]
{
DatabaseSchemaSql.GetPurchasingEkkoCacheCreateSql(),
DatabaseSchemaSql.GetPurchasingEkpoCacheCreateSql(),
DatabaseSchemaSql.GetPurchasingEketCacheCreateSql(),
DatabaseSchemaSql.GetPurchasingSyncStateCreateSql()
})
{
using var cmd = conn.CreateCommand();
cmd.CommandText = createSql.Replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS");
cmd.ExecuteNonQuery();
}
using var ekpoIndex = conn.CreateCommand();
ekpoIndex.CommandText = "CREATE INDEX IF NOT EXISTS IX_PurchasingEkpoCache_Matkl ON PurchasingEkpoCache (Matkl);";
ekpoIndex.ExecuteNonQuery();
using var eketDateIndex = conn.CreateCommand();
eketDateIndex.CommandText = "CREATE INDEX IF NOT EXISTS IX_PurchasingEketCache_Eindt ON PurchasingEketCache (Eindt);";
eketDateIndex.ExecuteNonQuery();
}
private static void EnsureSapSourceTable(AppDbContext db)
{
var conn = db.Database.GetDbConnection();
@@ -140,6 +140,14 @@ public class DatabaseSeedService : IDatabaseSeedService
if (string.IsNullOrWhiteSpace(existing.ItemType)) existing.ItemType = item.ItemType;
if (string.IsNullOrWhiteSpace(existing.Match)) existing.Match = item.Match;
if (string.IsNullOrWhiteSpace(existing.RequiredPolicy)) existing.RequiredPolicy = item.RequiredPolicy;
if (existing.Key == "purchasing-ideas")
{
existing.ItemType = item.ItemType;
existing.Href = item.Href;
existing.Match = item.Match;
existing.Icon = item.Icon;
existing.IsExpanded = item.IsExpanded;
}
existing.IsSystem = true;
changed = true;
}
@@ -185,7 +193,13 @@ public class DatabaseSeedService : IDatabaseSeedService
Link("purchasing-open-orders", "purchasing", "Offene Bestellungen", "Open orders", "PendingActions", "einkauf/offene-bestellungen", 30, "All"),
Link("purchasing-contracts", "purchasing", "Kontrakte", "Contracts", "Assignment", "einkauf/kontrakte", 40, "All"),
Link("purchasing-suppliers", "purchasing", "Lieferanten", "Suppliers", "Verified", "einkauf/lieferanten", 50, "All"),
Link("purchasing-ideas", "purchasing", "Ideen", "Ideas", "Lightbulb", "einkauf/ideen", 60, "All"),
Group("purchasing-ideas", "purchasing", "Ideen", "Ideas", "Lightbulb", 60, expanded: true),
Link("purchasing-ideas-overview", "purchasing-ideas", "Uebersicht", "Overview", "Lightbulb", "einkauf/ideen", 10, "All"),
Link("purchasing-idea-data-service", "purchasing-ideas", "Einkauf-Datenservice", "Purchasing data service", "Storage", "einkauf/ideen/datenservice", 20, "All"),
Link("purchasing-idea-delivery-risk", "purchasing-ideas", "Liefertermin-Risiko", "Delivery due-date risk", "PendingActions", "einkauf/ideen/liefertermin-risiko", 30, "All"),
Link("purchasing-idea-price-variance", "purchasing-ideas", "Preisabweichung", "Price variance", "TrendingUp", "einkauf/ideen/preisabweichung", 40, "All"),
Link("purchasing-idea-spend-concentration", "purchasing-ideas", "Spend-Konzentration", "Spend concentration", "PieChart", "einkauf/ideen/spend-konzentration", 50, "All"),
Link("purchasing-idea-data-quality", "purchasing-ideas", "Datenqualitaet", "Data quality", "FactCheck", "einkauf/ideen/datenqualitaet", 60, "All"),
Link("purchasing-kpi-catalog", "purchasing", "Kennzahlen-Katalog", "KPI catalogue", "Checklist", "einkauf/kennzahlen", 70, "All"),
Link("purchasing-pbix", "purchasing", "PBIX Vorlage", "PBIX template", "InsertChart", "einkauf/pbix", 80, "All"),
Link("purchasing-3d", "purchasing", "3D Simulation", "3D simulation", "ViewInAr", "einkauf/3d", 90, "All"),
@@ -16,6 +16,9 @@ public sealed class PurchasingDashboardLiveState
public DateTime? LatestOrderDate { get; set; }
public int PositionSampleCount { get; set; }
public int ScheduleSampleCount { get; set; }
public bool UsesCache { get; set; }
public string CacheStatus { get; set; } = string.Empty;
public DateTime? CacheCompletedAtUtc { get; set; }
public decimal SpendChfSample { get; set; }
public decimal OpenQuantitySample { get; set; }
public decimal OpenValueSample { get; set; }
@@ -0,0 +1,24 @@
namespace TrafagSalesExporter.Services;
public interface IPurchasingDataRefreshService
{
Task<PurchasingDataRefreshStatus> GetStatusAsync(CancellationToken cancellationToken = default);
Task<PurchasingDataRefreshStatus> RunFullLoadAsync(DateTime? fromDate = null, CancellationToken cancellationToken = default);
Task<PurchasingDataRefreshStatus> RunDeltaAsync(DateTime? fromDate = null, CancellationToken cancellationToken = default);
}
public sealed class PurchasingDataRefreshStatus
{
public string Mode { get; set; } = string.Empty;
public string Status { get; set; } = string.Empty;
public DateTime? StartedAtUtc { get; set; }
public DateTime? CompletedAtUtc { get; set; }
public DateTime? FromDate { get; set; }
public DateTime? ToDate { get; set; }
public DateTime? LastSuccessfulDeltaAtUtc { get; set; }
public int EkkoRows { get; set; }
public int EkpoRows { get; set; }
public int EketRows { get; set; }
public string Message { get; set; } = string.Empty;
public bool IsComplete => string.Equals(Status, "Success", StringComparison.OrdinalIgnoreCase);
}
@@ -33,7 +33,9 @@ public static class NavigationIconResolver
"Settings" => Icons.Material.Filled.Settings,
"ShoppingCart" => Icons.Material.Filled.ShoppingCart,
"Speed" => Icons.Material.Filled.Speed,
"Storage" => Icons.Material.Filled.Storage,
"Transform" => Icons.Material.Filled.Transform,
"TrendingUp" => Icons.Material.Filled.TrendingUp,
"Tune" => Icons.Material.Filled.Tune,
"UploadFile" => Icons.Material.Filled.UploadFile,
"Verified" => Icons.Material.Filled.Verified,
@@ -2,6 +2,7 @@ using System.Globalization;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using Microsoft.Data.Sqlite;
using Microsoft.EntityFrameworkCore;
using TrafagSalesExporter.Data;
@@ -23,6 +24,9 @@ public sealed class PurchasingDashboardService : IPurchasingDashboardService
try
{
await using var db = await _dbFactory.CreateDbContextAsync(cancellationToken);
if (await TryLoadCacheStateAsync(db, state, cancellationToken))
return state;
var sap = await db.SourceSystemDefinitions.AsNoTracking().FirstOrDefaultAsync(x => x.Code == "SAP", cancellationToken);
var site = await db.Sites.AsNoTracking().FirstOrDefaultAsync(x => x.TSC == PurchasingDataSourcePageService.PurchasingTsc, cancellationToken);
if (sap is null || site is null)
@@ -104,6 +108,85 @@ public sealed class PurchasingDashboardService : IPurchasingDashboardService
return state;
}
private static async Task<bool> TryLoadCacheStateAsync(AppDbContext db, PurchasingDashboardLiveState state, CancellationToken cancellationToken)
{
var conn = (SqliteConnection)db.Database.GetDbConnection();
if (conn.State != System.Data.ConnectionState.Open)
await conn.OpenAsync(cancellationToken);
var ekkoRows = await ExecuteScalarIntAsync(conn, "SELECT COUNT(1) FROM PurchasingEkkoCache;", cancellationToken);
var ekpoRows = await ExecuteScalarIntAsync(conn, "SELECT COUNT(1) FROM PurchasingEkpoCache;", cancellationToken);
var eketRows = await ExecuteScalarIntAsync(conn, "SELECT COUNT(1) FROM PurchasingEketCache;", cancellationToken);
if (ekkoRows <= 0 || ekpoRows <= 0 || eketRows <= 0)
return false;
var latestStatus = await ReadCacheStatusAsync(conn, cancellationToken);
state.UsesCache = true;
state.SapReachable = true;
state.EkkoLoaded = true;
state.EkpoLoaded = true;
state.EketLoaded = true;
state.PurchaseOrderCount = ekkoRows;
state.PositionSampleCount = ekpoRows;
state.ScheduleSampleCount = eketRows;
state.SupplierCount = await ExecuteScalarIntAsync(conn, "SELECT COUNT(DISTINCT Lifnr) FROM PurchasingEkkoCache WHERE Lifnr <> '';", cancellationToken);
state.LatestOrderDate = await ExecuteScalarDateAsync(conn, "SELECT MAX(Bedat) FROM PurchasingEkkoCache;", cancellationToken);
state.SpendChfSample = await ExecuteScalarDecimalAsync(conn, "SELECT COALESCE(SUM(CAST(Netwr AS REAL)), 0) FROM PurchasingEkpoCache WHERE Loekz = '';", cancellationToken);
state.OpenQuantitySample = await ExecuteScalarDecimalAsync(conn, "SELECT COALESCE(SUM(MAX(CAST(e.Menge AS REAL) - CAST(e.Wemng AS REAL), 0)), 0) FROM PurchasingEketCache e;", cancellationToken);
state.OpenValueSample = await ExecuteScalarDecimalAsync(conn, @"
SELECT COALESCE(SUM(MAX(CAST(e.Menge AS REAL) - CAST(e.Wemng AS REAL), 0) *
CASE WHEN CAST(p.Menge AS REAL) = 0 THEN 0 ELSE CAST(p.Netwr AS REAL) / CAST(p.Menge AS REAL) END), 0)
FROM PurchasingEketCache e
LEFT JOIN PurchasingEkpoCache p ON p.Ebeln = e.Ebeln AND p.Ebelp = e.Ebelp
WHERE COALESCE(p.Loekz, '') = '';", cancellationToken);
state.ContractValueSample = state.OpenValueSample;
state.TopSupplierLabel = await ExecuteTopLabelAsync(conn, @"
SELECT COALESCE(k.Lifnr, 'ohne Lieferant') AS Label, SUM(CAST(p.Netwr AS REAL)) AS Value
FROM PurchasingEkpoCache p
LEFT JOIN PurchasingEkkoCache k ON k.Ebeln = p.Ebeln
WHERE p.Loekz = ''
GROUP BY COALESCE(k.Lifnr, 'ohne Lieferant')
ORDER BY Value DESC
LIMIT 1;", "Lieferant", cancellationToken);
state.TopMaterialGroupLabel = await ExecuteTopLabelAsync(conn, @"
SELECT COALESCE(NULLIF(Matkl, ''), 'ohne Warengruppe') AS Label, SUM(CAST(Netwr AS REAL)) AS Value
FROM PurchasingEkpoCache
WHERE Loekz = ''
GROUP BY COALESCE(NULLIF(Matkl, ''), 'ohne Warengruppe')
ORDER BY Value DESC
LIMIT 1;", "Warengruppe", cancellationToken);
state.TopArticleLabel = await ExecuteTopLabelAsync(conn, @"
SELECT COALESCE(NULLIF(Matnr, ''), NULLIF(Txz01, ''), 'ohne Artikel') AS Label, SUM(CAST(Netwr AS REAL)) AS Value
FROM PurchasingEkpoCache
WHERE Loekz = ''
GROUP BY COALESCE(NULLIF(Matnr, ''), NULLIF(Txz01, ''), 'ohne Artikel')
ORDER BY Value DESC
LIMIT 1;", "Artikel", cancellationToken);
state.SpendChartRows = await ExecuteChartRowsAsync(conn, @"
SELECT 'Lief. ' || COALESCE(NULLIF(k.Lifnr, ''), 'ohne Lieferant') AS Label, SUM(CAST(p.Netwr AS REAL)) AS Value
FROM PurchasingEkpoCache p
LEFT JOIN PurchasingEkkoCache k ON k.Ebeln = p.Ebeln
WHERE p.Loekz = ''
GROUP BY COALESCE(NULLIF(k.Lifnr, ''), 'ohne Lieferant')
ORDER BY Value DESC
LIMIT 6;", cancellationToken);
state.OpenValueChartRows = await ExecuteChartRowsAsync(conn, @"
SELECT COALESCE(substr(e.Eindt, 1, 7), 'ohne Termin') AS Label,
SUM(MAX(CAST(e.Menge AS REAL) - CAST(e.Wemng AS REAL), 0) *
CASE WHEN CAST(p.Menge AS REAL) = 0 THEN 0 ELSE CAST(p.Netwr AS REAL) / CAST(p.Menge AS REAL) END) AS Value
FROM PurchasingEketCache e
LEFT JOIN PurchasingEkpoCache p ON p.Ebeln = e.Ebeln AND p.Ebelp = e.Ebelp
WHERE COALESCE(p.Loekz, '') = ''
GROUP BY COALESCE(substr(e.Eindt, 1, 7), 'ohne Termin')
ORDER BY Label
LIMIT 6;", cancellationToken);
state.ContractChartRows = state.OpenValueChartRows.ToList();
state.CacheStatus = latestStatus.Status;
state.CacheCompletedAtUtc = latestStatus.CompletedAtUtc;
state.Message = $"Einkauf Cache geladen: EKKO={ekkoRows:N0}, EKPO={ekpoRows:N0}, EKET={eketRows:N0}. {latestStatus.Message}";
return true;
}
private static void ApplyEkpoMetrics(
PurchasingDashboardLiveState state,
List<Dictionary<string, object?>> ekkoRows,
@@ -242,6 +325,74 @@ public sealed class PurchasingDashboardService : IPurchasingDashboardService
return int.TryParse(text.Trim(), NumberStyles.Integer, CultureInfo.InvariantCulture, out var value) ? value : null;
}
private static async Task<int> ExecuteScalarIntAsync(SqliteConnection conn, string sql, CancellationToken cancellationToken)
{
await using var command = conn.CreateCommand();
command.CommandText = sql;
var value = await command.ExecuteScalarAsync(cancellationToken);
return Convert.ToInt32(value ?? 0, CultureInfo.InvariantCulture);
}
private static async Task<decimal> ExecuteScalarDecimalAsync(SqliteConnection conn, string sql, CancellationToken cancellationToken)
{
await using var command = conn.CreateCommand();
command.CommandText = sql;
var value = await command.ExecuteScalarAsync(cancellationToken);
return Convert.ToDecimal(value ?? 0, CultureInfo.InvariantCulture);
}
private static async Task<DateTime?> ExecuteScalarDateAsync(SqliteConnection conn, string sql, CancellationToken cancellationToken)
{
await using var command = conn.CreateCommand();
command.CommandText = sql;
var value = Convert.ToString(await command.ExecuteScalarAsync(cancellationToken), CultureInfo.InvariantCulture);
return string.IsNullOrWhiteSpace(value) ? null : TryParseSapDate(value);
}
private static async Task<string> ExecuteTopLabelAsync(SqliteConnection conn, string sql, string fallback, CancellationToken cancellationToken)
{
await using var command = conn.CreateCommand();
command.CommandText = sql;
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
return fallback;
var label = reader.GetString(0);
var value = Convert.ToDecimal(reader.GetValue(1), CultureInfo.InvariantCulture);
return $"{label}: CHF {value:N0}";
}
private static async Task<List<PurchasingLiveChartPoint>> ExecuteChartRowsAsync(SqliteConnection conn, string sql, CancellationToken cancellationToken)
{
var rows = new List<PurchasingLiveChartPoint>();
await using var command = conn.CreateCommand();
command.CommandText = sql;
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
var label = reader.GetString(0);
var value = Convert.ToDecimal(reader.GetValue(1), CultureInfo.InvariantCulture);
rows.Add(new PurchasingLiveChartPoint(label, value));
}
return rows;
}
private static async Task<(string Status, DateTime? CompletedAtUtc, string Message)> ReadCacheStatusAsync(SqliteConnection conn, CancellationToken cancellationToken)
{
await using var command = conn.CreateCommand();
command.CommandText = "SELECT Status, CompletedAtUtc, Message FROM PurchasingSyncState ORDER BY Id DESC LIMIT 1;";
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
return ("Cache", null, string.Empty);
var completedText = reader.IsDBNull(1) ? string.Empty : reader.GetString(1);
var completed = DateTime.TryParse(completedText, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal, out var parsed)
? parsed
: (DateTime?)null;
return (reader.GetString(0), completed, reader.GetString(2));
}
private static object? ConvertJsonValue(JsonElement value) => value.ValueKind switch
{
JsonValueKind.String => value.GetString(),
@@ -0,0 +1,382 @@
using System.Data;
using System.Globalization;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using Microsoft.Data.Sqlite;
using Microsoft.EntityFrameworkCore;
using TrafagSalesExporter.Data;
namespace TrafagSalesExporter.Services;
public sealed class PurchasingDataRefreshService : IPurchasingDataRefreshService
{
private const int PageSize = 1000;
private readonly IDbContextFactory<AppDbContext> _dbFactory;
private readonly IAppEventLogService _logService;
public PurchasingDataRefreshService(IDbContextFactory<AppDbContext> dbFactory, IAppEventLogService logService)
{
_dbFactory = dbFactory;
_logService = logService;
}
public async Task<PurchasingDataRefreshStatus> GetStatusAsync(CancellationToken cancellationToken = default)
{
await using var db = await _dbFactory.CreateDbContextAsync(cancellationToken);
var conn = (SqliteConnection)db.Database.GetDbConnection();
if (conn.State != ConnectionState.Open)
await conn.OpenAsync(cancellationToken);
var status = await ReadLatestStatusAsync(conn, cancellationToken);
status.EkkoRows = await CountTableAsync(conn, "PurchasingEkkoCache", cancellationToken);
status.EkpoRows = await CountTableAsync(conn, "PurchasingEkpoCache", cancellationToken);
status.EketRows = await CountTableAsync(conn, "PurchasingEketCache", cancellationToken);
return status;
}
public async Task<PurchasingDataRefreshStatus> RunFullLoadAsync(DateTime? fromDate = null, CancellationToken cancellationToken = default)
{
var started = DateTime.UtcNow;
await WriteStatusAsync("Full", "Running", started, null, fromDate, null, null, 0, 0, 0, "Full Load gestartet.", cancellationToken);
await _logService.WriteAsync("Purchasing", "Einkauf Full Load gestartet", details: fromDate?.ToString("yyyy-MM-dd", CultureInfo.InvariantCulture));
try
{
var connection = await ResolveConnectionAsync(cancellationToken);
using var client = CreateClient(connection.Username, connection.Password);
var nowText = DateTime.UtcNow.ToString("O", CultureInfo.InvariantCulture);
var ekkoFilter = fromDate.HasValue ? $"Bedat ge '{fromDate.Value:yyyy-MM-dd}'" : string.Empty;
var ekkoRows = await ReadAllRowsAsync(client, connection.BaseUrl, "EKKOSet", "Ebeln,Bedat,Aedat,Lifnr,Bukrs,Konnr,Waers,Wkurs", ekkoFilter, "Ebeln", cancellationToken);
var ekpoRows = await ReadAllRowsAsync(client, connection.BaseUrl, "EKPOSet", "Ebeln,Ebelp,Matnr,Txz01,Matkl,Menge,Ktmng,Netwr,Loekz,Bukrs,Werks", string.Empty, "Ebeln,Ebelp", cancellationToken);
var eketRows = await ReadAllRowsAsync(client, connection.BaseUrl, "eketSet", "Ebeln,Ebelp,Etenr,Eindt,Menge,Wemng", string.Empty, "Ebeln,Ebelp,Etenr", cancellationToken);
await using var db = await _dbFactory.CreateDbContextAsync(cancellationToken);
var conn = (SqliteConnection)db.Database.GetDbConnection();
if (conn.State != ConnectionState.Open)
await conn.OpenAsync(cancellationToken);
await using var transaction = (SqliteTransaction)await conn.BeginTransactionAsync(cancellationToken);
await ExecuteAsync(conn, transaction, "DELETE FROM PurchasingEkkoCache;", cancellationToken);
await ExecuteAsync(conn, transaction, "DELETE FROM PurchasingEkpoCache;", cancellationToken);
await ExecuteAsync(conn, transaction, "DELETE FROM PurchasingEketCache;", cancellationToken);
await UpsertEkkoAsync(conn, transaction, ekkoRows, nowText, cancellationToken);
await UpsertEkpoAsync(conn, transaction, ekpoRows, nowText, cancellationToken);
await UpsertEketAsync(conn, transaction, eketRows, nowText, cancellationToken);
await transaction.CommitAsync(cancellationToken);
var completed = DateTime.UtcNow;
var message = $"Full Load abgeschlossen: EKKO={ekkoRows.Count:N0}, EKPO={ekpoRows.Count:N0}, EKET={eketRows.Count:N0}.";
await WriteStatusAsync("Full", "Success", started, completed, fromDate, null, completed, ekkoRows.Count, ekpoRows.Count, eketRows.Count, message, cancellationToken);
await _logService.WriteAsync("Purchasing", "Einkauf Full Load erfolgreich", details: message);
return await GetStatusAsync(cancellationToken);
}
catch (Exception ex)
{
var message = $"Full Load fehlgeschlagen: {ex.Message}";
await WriteStatusAsync("Full", "Error", started, DateTime.UtcNow, fromDate, null, null, 0, 0, 0, message, cancellationToken);
await _logService.WriteAsync("Purchasing", "Einkauf Full Load fehlgeschlagen", "Error", details: ex.ToString());
return await GetStatusAsync(cancellationToken);
}
}
public async Task<PurchasingDataRefreshStatus> RunDeltaAsync(DateTime? fromDate = null, CancellationToken cancellationToken = default)
{
var current = await GetStatusAsync(cancellationToken);
var deltaFrom = fromDate ?? current.LastSuccessfulDeltaAtUtc ?? current.CompletedAtUtc ?? DateTime.UtcNow.AddDays(-7);
var started = DateTime.UtcNow;
await WriteStatusAsync("Delta", "Running", started, null, deltaFrom, null, current.LastSuccessfulDeltaAtUtc, current.EkkoRows, current.EkpoRows, current.EketRows, "Delta gestartet.", cancellationToken);
try
{
var connection = await ResolveConnectionAsync(cancellationToken);
using var client = CreateClient(connection.Username, connection.Password);
var filter = $"Aedat ge '{deltaFrom:yyyy-MM-dd}'";
var changedEkko = await ReadAllRowsAsync(client, connection.BaseUrl, "EKKOSet", "Ebeln,Bedat,Aedat,Lifnr,Bukrs,Konnr,Waers,Wkurs", filter, "Ebeln", cancellationToken);
var ebelnKeys = changedEkko
.Select(row => GetText(row, "Ebeln"))
.Where(value => !string.IsNullOrWhiteSpace(value))
.Distinct(StringComparer.OrdinalIgnoreCase)
.ToList();
var ekpoRows = new List<Dictionary<string, object?>>();
var eketRows = new List<Dictionary<string, object?>>();
foreach (var ebeln in ebelnKeys)
{
ekpoRows.AddRange(await ReadAllRowsAsync(client, connection.BaseUrl, "EKPOSet", "Ebeln,Ebelp,Matnr,Txz01,Matkl,Menge,Ktmng,Netwr,Loekz,Bukrs,Werks", $"Ebeln eq '{ebeln}'", "Ebelp", cancellationToken));
eketRows.AddRange(await ReadAllRowsAsync(client, connection.BaseUrl, "eketSet", "Ebeln,Ebelp,Etenr,Eindt,Menge,Wemng", $"Ebeln eq '{ebeln}'", "Ebelp,Etenr", cancellationToken));
}
await using var db = await _dbFactory.CreateDbContextAsync(cancellationToken);
var conn = (SqliteConnection)db.Database.GetDbConnection();
if (conn.State != ConnectionState.Open)
await conn.OpenAsync(cancellationToken);
var nowText = DateTime.UtcNow.ToString("O", CultureInfo.InvariantCulture);
await using var transaction = (SqliteTransaction)await conn.BeginTransactionAsync(cancellationToken);
await UpsertEkkoAsync(conn, transaction, changedEkko, nowText, cancellationToken);
await UpsertEkpoAsync(conn, transaction, ekpoRows, nowText, cancellationToken);
await UpsertEketAsync(conn, transaction, eketRows, nowText, cancellationToken);
await transaction.CommitAsync(cancellationToken);
var completed = DateTime.UtcNow;
var status = await GetStatusAsync(cancellationToken);
var message = $"Delta abgeschlossen: geaenderte Belege={ebelnKeys.Count:N0}, EKPO={ekpoRows.Count:N0}, EKET={eketRows.Count:N0}.";
await WriteStatusAsync("Delta", "Success", started, completed, deltaFrom, null, completed, status.EkkoRows, status.EkpoRows, status.EketRows, message, cancellationToken);
await _logService.WriteAsync("Purchasing", "Einkauf Delta erfolgreich", details: message);
return await GetStatusAsync(cancellationToken);
}
catch (Exception ex)
{
await WriteStatusAsync("Delta", "Error", started, DateTime.UtcNow, deltaFrom, null, current.LastSuccessfulDeltaAtUtc, current.EkkoRows, current.EkpoRows, current.EketRows, $"Delta fehlgeschlagen: {ex.Message}", cancellationToken);
await _logService.WriteAsync("Purchasing", "Einkauf Delta fehlgeschlagen", "Error", details: ex.ToString());
return await GetStatusAsync(cancellationToken);
}
}
private async Task<PurchasingSapConnection> ResolveConnectionAsync(CancellationToken cancellationToken)
{
await using var db = await _dbFactory.CreateDbContextAsync(cancellationToken);
var sap = await db.SourceSystemDefinitions.AsNoTracking().FirstOrDefaultAsync(x => x.Code == "SAP", cancellationToken)
?? throw new InvalidOperationException("SAP Quelle fehlt.");
var site = await db.Sites.AsNoTracking().FirstOrDefaultAsync(x => x.TSC == PurchasingDataSourcePageService.PurchasingTsc, cancellationToken)
?? throw new InvalidOperationException("Einkauf SAP Site fehlt.");
var serviceUrl = string.IsNullOrWhiteSpace(site.SapServiceUrl) ? sap.CentralServiceUrl : site.SapServiceUrl;
var username = string.IsNullOrWhiteSpace(site.UsernameOverride) ? sap.CentralUsername : site.UsernameOverride;
var password = string.IsNullOrWhiteSpace(site.PasswordOverride) ? sap.CentralPassword : site.PasswordOverride;
if (string.IsNullOrWhiteSpace(serviceUrl) || string.IsNullOrWhiteSpace(username) || string.IsNullOrWhiteSpace(password))
throw new InvalidOperationException("SAP URL oder Zugangsdaten fehlen.");
return new PurchasingSapConnection(serviceUrl.TrimEnd('/') + "/", username, password);
}
private static HttpClient CreateClient(string username, string password)
{
var client = new HttpClient { Timeout = TimeSpan.FromMinutes(5) };
var token = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"));
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", token);
client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
return client;
}
private static async Task<List<Dictionary<string, object?>>> ReadAllRowsAsync(HttpClient client, string baseUrl, string entitySet, string select, string filter, string orderBy, CancellationToken cancellationToken)
{
var rows = new List<Dictionary<string, object?>>();
for (var skip = 0; ; skip += PageSize)
{
var url = $"{baseUrl}{entitySet}?$format=json&$top={PageSize}&$skip={skip}&$select={Uri.EscapeDataString(select)}";
if (!string.IsNullOrWhiteSpace(orderBy))
url += $"&$orderby={Uri.EscapeDataString(orderBy)}";
if (!string.IsNullOrWhiteSpace(filter))
url += $"&$filter={Uri.EscapeDataString(filter)}";
using var response = await client.GetAsync(url, cancellationToken);
if (!response.IsSuccessStatusCode)
{
var error = await response.Content.ReadAsStringAsync(cancellationToken);
throw new HttpRequestException($"SAP OData {entitySet} fehlgeschlagen ({(int)response.StatusCode} {response.ReasonPhrase}) URL={url} Antwort={TrimForLog(error)}");
}
var json = await response.Content.ReadAsStringAsync(cancellationToken);
var page = ParseRows(json);
if (page.Count == 0)
return rows;
rows.AddRange(page);
if (page.Count < PageSize)
return rows;
}
}
private static List<Dictionary<string, object?>> ParseRows(string json)
{
using var document = JsonDocument.Parse(json);
if (!document.RootElement.TryGetProperty("d", out var d) ||
!d.TryGetProperty("results", out var results) ||
results.ValueKind != JsonValueKind.Array)
return [];
return results.EnumerateArray()
.Select(item => item.EnumerateObject()
.Where(property => property.Name != "__metadata")
.ToDictionary(property => property.Name, property => ConvertJsonValue(property.Value), StringComparer.OrdinalIgnoreCase))
.ToList();
}
private static async Task UpsertEkkoAsync(SqliteConnection conn, SqliteTransaction transaction, IReadOnlyList<Dictionary<string, object?>> rows, string loadedAtUtc, CancellationToken cancellationToken)
{
const string sql = @"
INSERT OR REPLACE INTO PurchasingEkkoCache (Ebeln, Bedat, Aedat, Lifnr, Bukrs, Bsart, RawJson, LastLoadedAtUtc)
VALUES ($Ebeln, $Bedat, $Aedat, $Lifnr, $Bukrs, $Bsart, $RawJson, $LastLoadedAtUtc);";
foreach (var row in rows)
await ExecuteWithParametersAsync(conn, transaction, sql, new()
{
["$Ebeln"] = GetText(row, "Ebeln"),
["$Bedat"] = NormalizeSapDate(GetText(row, "Bedat")),
["$Aedat"] = NormalizeSapDate(GetText(row, "Aedat")),
["$Lifnr"] = GetText(row, "Lifnr"),
["$Bukrs"] = GetText(row, "Bukrs"),
["$Bsart"] = GetText(row, "Bsart"),
["$RawJson"] = JsonSerializer.Serialize(row),
["$LastLoadedAtUtc"] = loadedAtUtc
}, cancellationToken);
}
private static async Task UpsertEkpoAsync(SqliteConnection conn, SqliteTransaction transaction, IReadOnlyList<Dictionary<string, object?>> rows, string loadedAtUtc, CancellationToken cancellationToken)
{
const string sql = @"
INSERT OR REPLACE INTO PurchasingEkpoCache (Ebeln, Ebelp, Matnr, Txz01, Matkl, Menge, Meins, Netwr, Loekz, RawJson, LastLoadedAtUtc)
VALUES ($Ebeln, $Ebelp, $Matnr, $Txz01, $Matkl, $Menge, $Meins, $Netwr, $Loekz, $RawJson, $LastLoadedAtUtc);";
foreach (var row in rows)
await ExecuteWithParametersAsync(conn, transaction, sql, new()
{
["$Ebeln"] = GetText(row, "Ebeln"),
["$Ebelp"] = GetText(row, "Ebelp"),
["$Matnr"] = GetText(row, "Matnr"),
["$Txz01"] = GetText(row, "Txz01"),
["$Matkl"] = GetText(row, "Matkl"),
["$Menge"] = GetText(row, "Menge"),
["$Meins"] = GetText(row, "Meins"),
["$Netwr"] = GetText(row, "Netwr"),
["$Loekz"] = GetText(row, "Loekz"),
["$RawJson"] = JsonSerializer.Serialize(row),
["$LastLoadedAtUtc"] = loadedAtUtc
}, cancellationToken);
}
private static async Task UpsertEketAsync(SqliteConnection conn, SqliteTransaction transaction, IReadOnlyList<Dictionary<string, object?>> rows, string loadedAtUtc, CancellationToken cancellationToken)
{
const string sql = @"
INSERT OR REPLACE INTO PurchasingEketCache (Ebeln, Ebelp, Etenr, Eindt, Menge, Wemng, RawJson, LastLoadedAtUtc)
VALUES ($Ebeln, $Ebelp, $Etenr, $Eindt, $Menge, $Wemng, $RawJson, $LastLoadedAtUtc);";
foreach (var row in rows)
await ExecuteWithParametersAsync(conn, transaction, sql, new()
{
["$Ebeln"] = GetText(row, "Ebeln"),
["$Ebelp"] = GetText(row, "Ebelp"),
["$Etenr"] = GetText(row, "Etenr"),
["$Eindt"] = NormalizeSapDate(GetText(row, "Eindt")),
["$Menge"] = GetText(row, "Menge"),
["$Wemng"] = GetText(row, "Wemng"),
["$RawJson"] = JsonSerializer.Serialize(row),
["$LastLoadedAtUtc"] = loadedAtUtc
}, cancellationToken);
}
private async Task WriteStatusAsync(string mode, string status, DateTime? startedAtUtc, DateTime? completedAtUtc, DateTime? fromDate, DateTime? toDate, DateTime? lastSuccessfulDeltaAtUtc, int ekkoRows, int ekpoRows, int eketRows, string message, CancellationToken cancellationToken)
{
await using var db = await _dbFactory.CreateDbContextAsync(cancellationToken);
var conn = (SqliteConnection)db.Database.GetDbConnection();
if (conn.State != ConnectionState.Open)
await conn.OpenAsync(cancellationToken);
const string sql = @"
INSERT INTO PurchasingSyncState (Mode, Status, StartedAtUtc, CompletedAtUtc, FromDate, ToDate, LastSuccessfulDeltaAtUtc, EkkoRows, EkpoRows, EketRows, Message)
VALUES ($Mode, $Status, $StartedAtUtc, $CompletedAtUtc, $FromDate, $ToDate, $LastSuccessfulDeltaAtUtc, $EkkoRows, $EkpoRows, $EketRows, $Message);";
await ExecuteWithParametersAsync(conn, null, sql, new()
{
["$Mode"] = mode,
["$Status"] = status,
["$StartedAtUtc"] = FormatDateTime(startedAtUtc),
["$CompletedAtUtc"] = FormatDateTime(completedAtUtc),
["$FromDate"] = FormatDate(fromDate),
["$ToDate"] = FormatDate(toDate),
["$LastSuccessfulDeltaAtUtc"] = FormatDateTime(lastSuccessfulDeltaAtUtc),
["$EkkoRows"] = ekkoRows,
["$EkpoRows"] = ekpoRows,
["$EketRows"] = eketRows,
["$Message"] = message
}, cancellationToken);
}
private static async Task<PurchasingDataRefreshStatus> ReadLatestStatusAsync(SqliteConnection conn, CancellationToken cancellationToken)
{
await using var command = conn.CreateCommand();
command.CommandText = @"
SELECT Mode, Status, StartedAtUtc, CompletedAtUtc, FromDate, ToDate, LastSuccessfulDeltaAtUtc, EkkoRows, EkpoRows, EketRows, Message
FROM PurchasingSyncState
ORDER BY Id DESC
LIMIT 1;";
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
return new PurchasingDataRefreshStatus { Status = "Empty", Message = "Noch kein Einkauf Full Load ausgefuehrt." };
return new PurchasingDataRefreshStatus
{
Mode = reader.GetString(0),
Status = reader.GetString(1),
StartedAtUtc = ParseDateTime(reader.GetString(2)),
CompletedAtUtc = ParseDateTime(reader.GetString(3)),
FromDate = ParseDate(reader.GetString(4)),
ToDate = ParseDate(reader.GetString(5)),
LastSuccessfulDeltaAtUtc = ParseDateTime(reader.GetString(6)),
EkkoRows = reader.GetInt32(7),
EkpoRows = reader.GetInt32(8),
EketRows = reader.GetInt32(9),
Message = reader.GetString(10)
};
}
private static async Task<int> CountTableAsync(SqliteConnection conn, string tableName, CancellationToken cancellationToken)
{
await using var command = conn.CreateCommand();
command.CommandText = $"SELECT COUNT(1) FROM {tableName};";
return Convert.ToInt32(await command.ExecuteScalarAsync(cancellationToken), CultureInfo.InvariantCulture);
}
private static async Task ExecuteAsync(SqliteConnection conn, SqliteTransaction transaction, string sql, CancellationToken cancellationToken)
{
await using var command = conn.CreateCommand();
command.Transaction = transaction;
command.CommandText = sql;
await command.ExecuteNonQueryAsync(cancellationToken);
}
private static async Task ExecuteWithParametersAsync(SqliteConnection conn, SqliteTransaction? transaction, string sql, Dictionary<string, object?> parameters, CancellationToken cancellationToken)
{
await using var command = conn.CreateCommand();
command.Transaction = transaction;
command.CommandText = sql;
foreach (var (key, value) in parameters)
command.Parameters.AddWithValue(key, value ?? DBNull.Value);
await command.ExecuteNonQueryAsync(cancellationToken);
}
private static object? ConvertJsonValue(JsonElement value) => value.ValueKind switch
{
JsonValueKind.String => value.GetString(),
JsonValueKind.Number when value.TryGetDecimal(out var number) => number,
JsonValueKind.True => true,
JsonValueKind.False => false,
JsonValueKind.Null => null,
_ => value.ToString()
};
private static string GetText(Dictionary<string, object?> row, string key)
=> row.TryGetValue(key, out var value) ? Convert.ToString(value, CultureInfo.InvariantCulture) ?? string.Empty : string.Empty;
private static string TrimForLog(string value)
=> value.Length <= 1000 ? value : value[..1000] + "...";
private static string? NormalizeSapDate(string value)
{
if (string.IsNullOrWhiteSpace(value))
return null;
if (DateTime.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.AssumeLocal, out var parsed))
return parsed.ToString("yyyy-MM-dd", CultureInfo.InvariantCulture);
return DateTime.TryParseExact(value, "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out parsed)
? parsed.ToString("yyyy-MM-dd", CultureInfo.InvariantCulture)
: value;
}
private static string FormatDateTime(DateTime? value)
=> value?.ToString("O", CultureInfo.InvariantCulture) ?? string.Empty;
private static string FormatDate(DateTime? value)
=> value?.ToString("yyyy-MM-dd", CultureInfo.InvariantCulture) ?? string.Empty;
private static DateTime? ParseDateTime(string value)
=> DateTime.TryParse(value, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal, out var parsed) ? parsed : null;
private static DateTime? ParseDate(string value)
=> DateTime.TryParseExact(value, "yyyy-MM-dd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var parsed) ? parsed : null;
private sealed record PurchasingSapConnection(string BaseUrl, string Username, string Password);
}