From 2c17171e77ef1e1b71efed90ddd209956a20cde5 Mon Sep 17 00:00:00 2001 From: Stefan Date: Fri, 5 Jun 2026 15:27:55 +0200 Subject: [PATCH] Optimize Fuchs_DataService: parallel file sync, shared HttpClient, cancellation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reviewed the data service against mfr_interface_description.md. The OData entity sync already follows @odata.nextLink and now inherits the MFR client's transient retry + timeout, so it is spec-aligned. Reliability/performance improvements: - MFRClient.GetFile no longer news up an HttpClient per call (socket-exhaustion risk); added GetFileAsync backed by one shared static HttpClient with per-request auth, and GetFile delegates to it. - GetInvoiceFiles_async now downloads + stores invoice PDFs in parallel (bounded concurrency 4) via Parallel.ForEachAsync instead of sequentially. - Threaded CancellationToken from the MfrSync job through UpdateIfNecessary_async/ UpdateRequested_async/GetInvoiceFiles_async and the entity-sync loops for graceful shutdown (cooperative checks between iterations). Entity-table sync is left sequential on purpose (referential ordering by updateneed). IFdsMfr sync methods gained optional CancellationToken params (default) — the web app only uses the read methods, so this stays source-compatible with Fuchs. Co-Authored-By: Claude Opus 4.8 --- Fuchs_DataService/FdsMain.cs | 6 ++-- Fuchs_DataService/FdsMfr.cs | 55 ++++++++++++++++++++----------- Fuchs_DataService/FdsMfrClient.cs | 10 ++++-- Fuchs_DataService/IFdsMfr.cs | 6 ++-- MFR_RESTClient/MFRClient.cs | 36 +++++++++++++++----- 5 files changed, 77 insertions(+), 36 deletions(-) diff --git a/Fuchs_DataService/FdsMain.cs b/Fuchs_DataService/FdsMain.cs index b2255fc..dc47e29 100644 --- a/Fuchs_DataService/FdsMain.cs +++ b/Fuchs_DataService/FdsMain.cs @@ -23,9 +23,9 @@ public class FdsService : ServiceControl new PeriodicJobDefinition("MfrSync", interval, async ct => { bool debug = FdsConfig.DebugDetails; - await mfr.UpdateIfNecessary_async(debug); - await mfr.UpdateRequested_async(debug); - await mfr.GetInvoiceFiles_async(debug); + await mfr.UpdateIfNecessary_async(debug, ct); + await mfr.UpdateRequested_async(debug, ct); + await mfr.GetInvoiceFiles_async(debug, ct); }) }; diff --git a/Fuchs_DataService/FdsMfr.cs b/Fuchs_DataService/FdsMfr.cs index ccb0bdf..e02131a 100644 --- a/Fuchs_DataService/FdsMfr.cs +++ b/Fuchs_DataService/FdsMfr.cs @@ -43,15 +43,19 @@ public class FdsMfr : IFdsMfr None = 0 } - public async Task UpdateIfNecessary_async(bool debugDetails = false) + /// Max parallel invoice-file downloads (independent per file). + private const int InvoiceFileDownloadConcurrency = 4; + + public async Task UpdateIfNecessary_async(bool debugDetails = false, CancellationToken cancellationToken = default) { using var mfr = new FdsMfrClient(_loggerFactory); try { if (debugDetails) FdsDebug.DebugToFile("UpdateIfNecessary_async - unn - start awaited", filename: "DebugDetail.txt"); - await mfr.Update__Entitytables(debugDetails); + await mfr.Update__Entitytables(debugDetails, cancellationToken: cancellationToken); if (debugDetails) FdsDebug.DebugToFile("UpdateIfNecessary_async - unn - completed", filename: "DebugDetail.txt"); } + catch (OperationCanceledException) { throw; } catch (Exception ex) { FdsDebug.DebugLog("UpdateIfNecessary_async - main unn", exc: ex); @@ -59,15 +63,16 @@ public class FdsMfr : IFdsMfr } } - public async Task UpdateRequested_async(bool debugDetails = false) + public async Task UpdateRequested_async(bool debugDetails = false, CancellationToken cancellationToken = default) { using var mfr = new FdsMfrClient(_loggerFactory); try { if (debugDetails) FdsDebug.DebugToFile("UpdateRequested_async - unn - start awaited", filename: "DebugDetail.txt"); - await mfr.Update__EntityRequests(debugDetails); + await mfr.Update__EntityRequests(debugDetails, cancellationToken); if (debugDetails) FdsDebug.DebugToFile("UpdateRequested_async - unn - completed", filename: "DebugDetail.txt"); } + catch (OperationCanceledException) { throw; } catch (Exception ex) { FdsDebug.DebugLog("UpdateRequested_async - main unn", exc: ex); @@ -75,7 +80,7 @@ public class FdsMfr : IFdsMfr } } - public async Task GetInvoiceFiles_async(bool debugDetails = false) + public async Task GetInvoiceFiles_async(bool debugDetails = false, CancellationToken cancellationToken = default) { using var mfr = new FdsMfrClient(_loggerFactory); try @@ -86,35 +91,45 @@ public class FdsMfr : IFdsMfr FdsShared.FDSConnectionString(), SqlParameterList: null, options: new FdsSqlOptions()); if (dtbl.Count > 0) { - foreach (DataRow ivrw in dtbl.DataTable.Rows) - { - string id = ivrw.nz("id"), docName = ivrw.nz("DocumentName"), fileurl = ivrw.nz("URI"); - if (!string.IsNullOrEmpty(id) && !string.IsNullOrEmpty(docName) && !string.IsNullOrEmpty(fileurl) && docName.EndsWith("pdf")) + var rows = dtbl.DataTable.Rows.Cast() + .Select(r => (id: r.nz("id"), docName: r.nz("DocumentName"), url: r.nz("URI"))) + .Where(r => !string.IsNullOrEmpty(r.id) && !string.IsNullOrEmpty(r.docName) + && !string.IsNullOrEmpty(r.url) && r.docName.EndsWith("pdf")) + .ToList(); + + int downloaded = 0; + // Files are independent → download (and store) in parallel with bounded concurrency. + await Parallel.ForEachAsync(rows, + new ParallelOptions { MaxDegreeOfParallelism = InvoiceFileDownloadConcurrency, CancellationToken = cancellationToken }, + async (r, ct) => { - var fl = mfr.GetFile(fileurl); - if (fl != null && fl.Length > 0) + try { - try + var fl = await mfr.GetFileAsync(r.url, throwErrorIfNotOk: false, cancellationToken: ct); + if (fl is { Length: > 0 }) { await setSQLValue_async( "EXECUTE [dbo].[fds__setMFRInvoiceFile] @Id, @filename, @file;", FdsShared.FDSConnectionString(), SqlParameterList: new ParamList( - SQL_VarChar("@Id", id), - SQL_VarChar("@filename", docName), + SQL_VarChar("@Id", r.id), + SQL_VarChar("@filename", r.docName), new SqlParameter("@file", fl) { SqlDbType = SqlDbType.VarBinary }), options: new FdsSqlOptions()); - } - catch (Exception fsex) - { - FdsDebug.DebugLog("GetInvoiceFiles_async - mfr storefile", exc: fsex); + Interlocked.Increment(ref downloaded); } } - } - } + catch (OperationCanceledException) { throw; } + catch (Exception fsex) + { + FdsDebug.DebugLog("GetInvoiceFiles_async - mfr storefile", exc: fsex); + } + }); + _logger.LogInformation("GetInvoiceFiles_async stored {Downloaded}/{Total} invoice files.", downloaded, rows.Count); } if (debugDetails) FdsDebug.DebugToFile("GetInvoiceFiles_async - completed", filename: "DebugDetail.txt"); } + catch (OperationCanceledException) { throw; } catch (Exception ex) { FdsDebug.DebugLog("GetInvoiceFiles_async - main unn", exc: ex); diff --git a/Fuchs_DataService/FdsMfrClient.cs b/Fuchs_DataService/FdsMfrClient.cs index d444229..6b00346 100644 --- a/Fuchs_DataService/FdsMfrClient.cs +++ b/Fuchs_DataService/FdsMfrClient.cs @@ -42,6 +42,9 @@ public class FdsMfrClient : IDisposable public byte[]? GetFile(string address, bool throwErrorIfNotOk = true) => _mfrClient.GetFile(address, throwErrorIfNotOk); + public Task GetFileAsync(string address, bool throwErrorIfNotOk = true, CancellationToken cancellationToken = default) => + _mfrClient.GetFileAsync(address, throwErrorIfNotOk, cancellationToken); + public async Task ReadOData(string address, bool throwErrorIfNotOk = true) => await _mfrClient.ReadOData(address, throwErrorIfNotOk); @@ -374,7 +377,8 @@ public class FdsMfrClient : IDisposable private static string NewDatatableSql(string tablename) => $"Select TOP(0) [setid] = CAST('' as varchar(50)), * FROM [dbo].[{tablename}];"; - public async Task Update__Entitytables(bool debugDetails = false, EntityTypes? tgtEntityType = null) + public async Task Update__Entitytables(bool debugDetails = false, EntityTypes? tgtEntityType = null, + CancellationToken cancellationToken = default) { Action dtf = (note, info, data, ex) => { @@ -398,6 +402,7 @@ public class FdsMfrClient : IDisposable { foreach (DataRow rw in updateableTables.Select("updateneed > 0", "updateneed DESC")) { + cancellationToken.ThrowIfCancellationRequested(); string etname = rw.nz("entity_name", ""); try { @@ -424,7 +429,7 @@ public class FdsMfrClient : IDisposable catch (Exception exa) { dlg("outer frame", "", "", exa); } } - public async Task Update__EntityRequests(bool debugDetails = false) + public async Task Update__EntityRequests(bool debugDetails = false, CancellationToken cancellationToken = default) { Action dtf = (note, info, data, ex) => { @@ -448,6 +453,7 @@ public class FdsMfrClient : IDisposable { foreach (DataRow rw in updateableRequests.Select("", "order")) { + cancellationToken.ThrowIfCancellationRequested(); string etname = rw.nz("entity_name", ""); long tgtid = rw.nint64("Id", -1); if (tgtid > -1 && !string.IsNullOrWhiteSpace(etname)) diff --git a/Fuchs_DataService/IFdsMfr.cs b/Fuchs_DataService/IFdsMfr.cs index 333fb99..75a6450 100644 --- a/Fuchs_DataService/IFdsMfr.cs +++ b/Fuchs_DataService/IFdsMfr.cs @@ -4,9 +4,9 @@ namespace fds; public interface IFdsMfr { - Task UpdateIfNecessary_async(bool debugDetails = false); - Task UpdateRequested_async(bool debugDetails = false); - Task GetInvoiceFiles_async(bool debugDetails = false); + Task UpdateIfNecessary_async(bool debugDetails = false, CancellationToken cancellationToken = default); + Task UpdateRequested_async(bool debugDetails = false, CancellationToken cancellationToken = default); + Task GetInvoiceFiles_async(bool debugDetails = false, CancellationToken cancellationToken = default); FileInfo? GetReportDoc(ref byte[]? file, string reportid, bool debugDetails = false); FileInfo? GetFdsDoc(ref byte[]? file, string reportid, string type); FileInfo? GetDatevZip(ref Stream? stream, DateTime tgtdate, string mode, string authUser, bool includeFiles, bool debugDetails = false); diff --git a/MFR_RESTClient/MFRClient.cs b/MFR_RESTClient/MFRClient.cs index 3eb8ceb..5ed2524 100644 --- a/MFR_RESTClient/MFRClient.cs +++ b/MFR_RESTClient/MFRClient.cs @@ -84,26 +84,46 @@ public class MFRClient : IDisposable return first; } + // One shared HttpClient per process avoids socket exhaustion from per-call instances. + // Auth + user-agent are set per request so the shared instance stays stateless. + private static readonly HttpClient _sharedHttpClient = new() { Timeout = TimeSpan.FromMinutes(5) }; + public byte[]? GetFile(string address, bool throwErrorIfNotOk = true) + => GetFileAsync(address, throwErrorIfNotOk).GetAwaiter().GetResult(); + + public async Task GetFileAsync(string address, bool throwErrorIfNotOk = true, CancellationToken cancellationToken = default) { - byte[]? data = null; - using var httpClient = new HttpClient(); - httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(DownloadUserAgent); - var credentials = Convert.ToBase64String(System.Text.Encoding.UTF8.GetBytes($"{_clientCredentials.Username}:{_clientCredentials.Password}")); - httpClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", credentials); try { - data = httpClient.GetByteArrayAsync(new Uri(address)).GetAwaiter().GetResult(); + using var req = new HttpRequestMessage(HttpMethod.Get, new Uri(address)); + req.Headers.UserAgent.ParseAdd(DownloadUserAgent); + var credentials = Convert.ToBase64String( + System.Text.Encoding.UTF8.GetBytes($"{_clientCredentials.Username}:{_clientCredentials.Password}")); + req.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", credentials); + + using var resp = await _sharedHttpClient.SendAsync(req, cancellationToken); + if (!resp.IsSuccessStatusCode) + { + _logger.LogWarning("GetFile non-success status {Status} — address={Address}", resp.StatusCode, address); + if (throwErrorIfNotOk && !HideCustomExceptions) + throw new MFRClientException(resp.StatusCode, $"GetFile failed ({(int)resp.StatusCode})", address); + return null; + } + return await resp.Content.ReadAsByteArrayAsync(cancellationToken); + } + catch (OperationCanceledException) + { + throw; } catch (HttpRequestException ex) { _logger.LogWarning(ex, "GetFile failed with HttpRequestException — address={Address}", address); + return null; } catch (Exception) { - // Swallow + return null; } - return data; } public async Task GetEntities(bool throwErrorIfNotOk = true)