Optimize Fuchs_DataService: parallel file sync, shared HttpClient, cancellation
Reviewed the data service against mfr_interface_description.md. The OData entity sync already follows @odata.nextLink and now inherits the MFR client's transient retry + timeout, so it is spec-aligned. Reliability/performance improvements: - MFRClient.GetFile no longer news up an HttpClient per call (socket-exhaustion risk); added GetFileAsync backed by one shared static HttpClient with per-request auth, and GetFile delegates to it. - GetInvoiceFiles_async now downloads + stores invoice PDFs in parallel (bounded concurrency 4) via Parallel.ForEachAsync instead of sequentially. - Threaded CancellationToken from the MfrSync job through UpdateIfNecessary_async/ UpdateRequested_async/GetInvoiceFiles_async and the entity-sync loops for graceful shutdown (cooperative checks between iterations). Entity-table sync is left sequential on purpose (referential ordering by updateneed). IFdsMfr sync methods gained optional CancellationToken params (default) — the web app only uses the read methods, so this stays source-compatible with Fuchs. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -23,9 +23,9 @@ public class FdsService : ServiceControl
|
||||
new PeriodicJobDefinition("MfrSync", interval, async ct =>
|
||||
{
|
||||
bool debug = FdsConfig.DebugDetails;
|
||||
await mfr.UpdateIfNecessary_async(debug);
|
||||
await mfr.UpdateRequested_async(debug);
|
||||
await mfr.GetInvoiceFiles_async(debug);
|
||||
await mfr.UpdateIfNecessary_async(debug, ct);
|
||||
await mfr.UpdateRequested_async(debug, ct);
|
||||
await mfr.GetInvoiceFiles_async(debug, ct);
|
||||
})
|
||||
};
|
||||
|
||||
|
||||
+32
-17
@@ -43,15 +43,19 @@ public class FdsMfr : IFdsMfr
|
||||
None = 0
|
||||
}
|
||||
|
||||
public async Task UpdateIfNecessary_async(bool debugDetails = false)
|
||||
/// <summary>Max parallel invoice-file downloads (independent per file).</summary>
|
||||
private const int InvoiceFileDownloadConcurrency = 4;
|
||||
|
||||
public async Task UpdateIfNecessary_async(bool debugDetails = false, CancellationToken cancellationToken = default)
|
||||
{
|
||||
using var mfr = new FdsMfrClient(_loggerFactory);
|
||||
try
|
||||
{
|
||||
if (debugDetails) FdsDebug.DebugToFile("UpdateIfNecessary_async - unn - start awaited", filename: "DebugDetail.txt");
|
||||
await mfr.Update__Entitytables(debugDetails);
|
||||
await mfr.Update__Entitytables(debugDetails, cancellationToken: cancellationToken);
|
||||
if (debugDetails) FdsDebug.DebugToFile("UpdateIfNecessary_async - unn - completed", filename: "DebugDetail.txt");
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
FdsDebug.DebugLog("UpdateIfNecessary_async - main unn", exc: ex);
|
||||
@@ -59,15 +63,16 @@ public class FdsMfr : IFdsMfr
|
||||
}
|
||||
}
|
||||
|
||||
public async Task UpdateRequested_async(bool debugDetails = false)
|
||||
public async Task UpdateRequested_async(bool debugDetails = false, CancellationToken cancellationToken = default)
|
||||
{
|
||||
using var mfr = new FdsMfrClient(_loggerFactory);
|
||||
try
|
||||
{
|
||||
if (debugDetails) FdsDebug.DebugToFile("UpdateRequested_async - unn - start awaited", filename: "DebugDetail.txt");
|
||||
await mfr.Update__EntityRequests(debugDetails);
|
||||
await mfr.Update__EntityRequests(debugDetails, cancellationToken);
|
||||
if (debugDetails) FdsDebug.DebugToFile("UpdateRequested_async - unn - completed", filename: "DebugDetail.txt");
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
FdsDebug.DebugLog("UpdateRequested_async - main unn", exc: ex);
|
||||
@@ -75,7 +80,7 @@ public class FdsMfr : IFdsMfr
|
||||
}
|
||||
}
|
||||
|
||||
public async Task GetInvoiceFiles_async(bool debugDetails = false)
|
||||
public async Task GetInvoiceFiles_async(bool debugDetails = false, CancellationToken cancellationToken = default)
|
||||
{
|
||||
using var mfr = new FdsMfrClient(_loggerFactory);
|
||||
try
|
||||
@@ -86,35 +91,45 @@ public class FdsMfr : IFdsMfr
|
||||
FdsShared.FDSConnectionString(), SqlParameterList: null, options: new FdsSqlOptions());
|
||||
if (dtbl.Count > 0)
|
||||
{
|
||||
foreach (DataRow ivrw in dtbl.DataTable.Rows)
|
||||
{
|
||||
string id = ivrw.nz("id"), docName = ivrw.nz("DocumentName"), fileurl = ivrw.nz("URI");
|
||||
if (!string.IsNullOrEmpty(id) && !string.IsNullOrEmpty(docName) && !string.IsNullOrEmpty(fileurl) && docName.EndsWith("pdf"))
|
||||
{
|
||||
var fl = mfr.GetFile(fileurl);
|
||||
if (fl != null && fl.Length > 0)
|
||||
var rows = dtbl.DataTable.Rows.Cast<DataRow>()
|
||||
.Select(r => (id: r.nz("id"), docName: r.nz("DocumentName"), url: r.nz("URI")))
|
||||
.Where(r => !string.IsNullOrEmpty(r.id) && !string.IsNullOrEmpty(r.docName)
|
||||
&& !string.IsNullOrEmpty(r.url) && r.docName.EndsWith("pdf"))
|
||||
.ToList();
|
||||
|
||||
int downloaded = 0;
|
||||
// Files are independent → download (and store) in parallel with bounded concurrency.
|
||||
await Parallel.ForEachAsync(rows,
|
||||
new ParallelOptions { MaxDegreeOfParallelism = InvoiceFileDownloadConcurrency, CancellationToken = cancellationToken },
|
||||
async (r, ct) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var fl = await mfr.GetFileAsync(r.url, throwErrorIfNotOk: false, cancellationToken: ct);
|
||||
if (fl is { Length: > 0 })
|
||||
{
|
||||
await setSQLValue_async(
|
||||
"EXECUTE [dbo].[fds__setMFRInvoiceFile] @Id, @filename, @file;",
|
||||
FdsShared.FDSConnectionString(),
|
||||
SqlParameterList: new ParamList(
|
||||
SQL_VarChar("@Id", id),
|
||||
SQL_VarChar("@filename", docName),
|
||||
SQL_VarChar("@Id", r.id),
|
||||
SQL_VarChar("@filename", r.docName),
|
||||
new SqlParameter("@file", fl) { SqlDbType = SqlDbType.VarBinary }),
|
||||
options: new FdsSqlOptions());
|
||||
Interlocked.Increment(ref downloaded);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (Exception fsex)
|
||||
{
|
||||
FdsDebug.DebugLog("GetInvoiceFiles_async - mfr storefile", exc: fsex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
_logger.LogInformation("GetInvoiceFiles_async stored {Downloaded}/{Total} invoice files.", downloaded, rows.Count);
|
||||
}
|
||||
if (debugDetails) FdsDebug.DebugToFile("GetInvoiceFiles_async - completed", filename: "DebugDetail.txt");
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
FdsDebug.DebugLog("GetInvoiceFiles_async - main unn", exc: ex);
|
||||
|
||||
@@ -42,6 +42,9 @@ public class FdsMfrClient : IDisposable
|
||||
public byte[]? GetFile(string address, bool throwErrorIfNotOk = true) =>
|
||||
_mfrClient.GetFile(address, throwErrorIfNotOk);
|
||||
|
||||
public Task<byte[]?> GetFileAsync(string address, bool throwErrorIfNotOk = true, CancellationToken cancellationToken = default) =>
|
||||
_mfrClient.GetFileAsync(address, throwErrorIfNotOk, cancellationToken);
|
||||
|
||||
public async Task<ODataEnvelope> ReadOData(string address, bool throwErrorIfNotOk = true) =>
|
||||
await _mfrClient.ReadOData(address, throwErrorIfNotOk);
|
||||
|
||||
@@ -374,7 +377,8 @@ public class FdsMfrClient : IDisposable
|
||||
private static string NewDatatableSql(string tablename) =>
|
||||
$"Select TOP(0) [setid] = CAST('' as varchar(50)), * FROM [dbo].[{tablename}];";
|
||||
|
||||
public async Task Update__Entitytables(bool debugDetails = false, EntityTypes? tgtEntityType = null)
|
||||
public async Task Update__Entitytables(bool debugDetails = false, EntityTypes? tgtEntityType = null,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
Action<string, string, string, Exception?> dtf = (note, info, data, ex) =>
|
||||
{
|
||||
@@ -398,6 +402,7 @@ public class FdsMfrClient : IDisposable
|
||||
{
|
||||
foreach (DataRow rw in updateableTables.Select("updateneed > 0", "updateneed DESC"))
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
string etname = rw.nz("entity_name", "");
|
||||
try
|
||||
{
|
||||
@@ -424,7 +429,7 @@ public class FdsMfrClient : IDisposable
|
||||
catch (Exception exa) { dlg("outer frame", "", "", exa); }
|
||||
}
|
||||
|
||||
public async Task Update__EntityRequests(bool debugDetails = false)
|
||||
public async Task Update__EntityRequests(bool debugDetails = false, CancellationToken cancellationToken = default)
|
||||
{
|
||||
Action<string, string, string, Exception?> dtf = (note, info, data, ex) =>
|
||||
{
|
||||
@@ -448,6 +453,7 @@ public class FdsMfrClient : IDisposable
|
||||
{
|
||||
foreach (DataRow rw in updateableRequests.Select("", "order"))
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
string etname = rw.nz("entity_name", "");
|
||||
long tgtid = rw.nint64("Id", -1);
|
||||
if (tgtid > -1 && !string.IsNullOrWhiteSpace(etname))
|
||||
|
||||
@@ -4,9 +4,9 @@ namespace fds;
|
||||
|
||||
public interface IFdsMfr
|
||||
{
|
||||
Task UpdateIfNecessary_async(bool debugDetails = false);
|
||||
Task UpdateRequested_async(bool debugDetails = false);
|
||||
Task GetInvoiceFiles_async(bool debugDetails = false);
|
||||
Task UpdateIfNecessary_async(bool debugDetails = false, CancellationToken cancellationToken = default);
|
||||
Task UpdateRequested_async(bool debugDetails = false, CancellationToken cancellationToken = default);
|
||||
Task GetInvoiceFiles_async(bool debugDetails = false, CancellationToken cancellationToken = default);
|
||||
FileInfo? GetReportDoc(ref byte[]? file, string reportid, bool debugDetails = false);
|
||||
FileInfo? GetFdsDoc(ref byte[]? file, string reportid, string type);
|
||||
FileInfo? GetDatevZip(ref Stream? stream, DateTime tgtdate, string mode, string authUser, bool includeFiles, bool debugDetails = false);
|
||||
|
||||
@@ -84,26 +84,46 @@ public class MFRClient : IDisposable
|
||||
return first;
|
||||
}
|
||||
|
||||
// One shared HttpClient per process avoids socket exhaustion from per-call instances.
|
||||
// Auth + user-agent are set per request so the shared instance stays stateless.
|
||||
private static readonly HttpClient _sharedHttpClient = new() { Timeout = TimeSpan.FromMinutes(5) };
|
||||
|
||||
public byte[]? GetFile(string address, bool throwErrorIfNotOk = true)
|
||||
=> GetFileAsync(address, throwErrorIfNotOk).GetAwaiter().GetResult();
|
||||
|
||||
public async Task<byte[]?> GetFileAsync(string address, bool throwErrorIfNotOk = true, CancellationToken cancellationToken = default)
|
||||
{
|
||||
byte[]? data = null;
|
||||
using var httpClient = new HttpClient();
|
||||
httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(DownloadUserAgent);
|
||||
var credentials = Convert.ToBase64String(System.Text.Encoding.UTF8.GetBytes($"{_clientCredentials.Username}:{_clientCredentials.Password}"));
|
||||
httpClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", credentials);
|
||||
try
|
||||
{
|
||||
data = httpClient.GetByteArrayAsync(new Uri(address)).GetAwaiter().GetResult();
|
||||
using var req = new HttpRequestMessage(HttpMethod.Get, new Uri(address));
|
||||
req.Headers.UserAgent.ParseAdd(DownloadUserAgent);
|
||||
var credentials = Convert.ToBase64String(
|
||||
System.Text.Encoding.UTF8.GetBytes($"{_clientCredentials.Username}:{_clientCredentials.Password}"));
|
||||
req.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", credentials);
|
||||
|
||||
using var resp = await _sharedHttpClient.SendAsync(req, cancellationToken);
|
||||
if (!resp.IsSuccessStatusCode)
|
||||
{
|
||||
_logger.LogWarning("GetFile non-success status {Status} — address={Address}", resp.StatusCode, address);
|
||||
if (throwErrorIfNotOk && !HideCustomExceptions)
|
||||
throw new MFRClientException(resp.StatusCode, $"GetFile failed ({(int)resp.StatusCode})", address);
|
||||
return null;
|
||||
}
|
||||
return await resp.Content.ReadAsByteArrayAsync(cancellationToken);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (HttpRequestException ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "GetFile failed with HttpRequestException — address={Address}", address);
|
||||
return null;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// Swallow
|
||||
return null;
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
public async Task<string> GetEntities(bool throwErrorIfNotOk = true)
|
||||
|
||||
Reference in New Issue
Block a user