forked from Geotab/mygeotab-api-adapter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFaultDataProcessor.cs
388 lines (349 loc) · 23.1 KB
/
FaultDataProcessor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
using Microsoft.Extensions.Hosting;
using MyGeotabAPIAdapter.Configuration;
using MyGeotabAPIAdapter.Database;
using MyGeotabAPIAdapter.Database.Caches;
using MyGeotabAPIAdapter.Database.DataAccess;
using MyGeotabAPIAdapter.Database.EntityMappers;
using MyGeotabAPIAdapter.Database.EntityPersisters;
using MyGeotabAPIAdapter.Database.Models;
using MyGeotabAPIAdapter.Exceptions;
using MyGeotabAPIAdapter.Helpers;
using MyGeotabAPIAdapter.Logging;
using NLog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
namespace MyGeotabAPIAdapter.DataOptimizer
{
/// <summary>
/// A <see cref="BackgroundService"/> that handles ETL processing of FaultData data from the Adapter database to the Optimizer database.
/// </summary>
class FaultDataProcessor : BackgroundService
{
string AssemblyName { get => GetType().Assembly.GetName().Name; }
string AssemblyVersion { get => GetType().Assembly.GetName().Version.ToString(); }
static string CurrentClassName { get => nameof(FaultDataProcessor); }
static string DefaultErrorMessagePrefix { get => $"{CurrentClassName} process caught an exception"; }
static int ThrottleEngagingBatchRecordCount { get => 1000; }
int lastBatchRecordCount = 0;
readonly IAdapterDatabaseObjectNames adapterDatabaseObjectNames;
readonly IConnectionInfoContainer connectionInfoContainer;
readonly IDataOptimizerConfiguration dataOptimizerConfiguration;
readonly IDateTimeHelper dateTimeHelper;
readonly IDbFaultDataDbFaultDataTEntityMapper dbFaultDataDbFaultDataTEntityMapper;
readonly IGenericEntityPersister<DbFaultData> dbFaultDataEntityPersister;
readonly IGenericEntityPersister<DbFaultDataT> dbFaultDataTEntityPersister;
readonly IGenericDbObjectCache<DbDeviceT> dbDeviceTObjectCache;
readonly DbDiagnosticIdTObjectCache dbDiagnosticIdTObjectCache;
readonly IGenericDbObjectCache<DbDiagnosticT> dbDiagnosticTObjectCache;
readonly IGenericDbObjectCache<DbUserT> dbUserTObjectCache;
readonly IExceptionHelper exceptionHelper;
readonly IMessageLogger messageLogger;
readonly IOptimizerDatabaseObjectNames optimizerDatabaseObjectNames;
readonly IOptimizerEnvironment optimizerEnvironment;
readonly IPrerequisiteProcessorChecker prerequisiteProcessorChecker;
readonly IProcessorTracker processorTracker;
readonly IStateMachine stateMachine;
readonly Logger logger = LogManager.GetCurrentClassLogger();
readonly UnitOfWorkContext adapterContext;
readonly UnitOfWorkContext optimizerContext;
/// <summary>
/// The last time a call was initiated to retrieve records from the DbFaultData table in the Adapter database.
/// </summary>
DateTime DbFaultDatasLastQueriedUtc { get; set; }
/// <summary>
/// Initializes a new instance of the <see cref="FaultDataProcessor"/> class.
/// </summary>
public FaultDataProcessor(IDataOptimizerConfiguration dataOptimizerConfiguration, IOptimizerDatabaseObjectNames optimizerDatabaseObjectNames, IOptimizerEnvironment optimizerEnvironment, IPrerequisiteProcessorChecker prerequisiteProcessorChecker, IAdapterDatabaseObjectNames adapterDatabaseObjectNames, IDateTimeHelper dateTimeHelper, IExceptionHelper exceptionHelper, IMessageLogger messageLogger, IStateMachine stateMachine, IConnectionInfoContainer connectionInfoContainer, IProcessorTracker processorTracker, IDbFaultDataDbFaultDataTEntityMapper dbFaultDataDbFaultDataTEntityMapper, IGenericEntityPersister<DbFaultData> dbFaultDataEntityPersister, IGenericDbObjectCache<DbDeviceT> dbDeviceTObjectCache, DbDiagnosticIdTObjectCache dbDiagnosticIdTObjectCache, IGenericDbObjectCache<DbDiagnosticT> dbDiagnosticTObjectCache, IGenericDbObjectCache<DbUserT> dbUserTObjectCache, IGenericEntityPersister<DbFaultDataT> dbFaultDataTEntityPersister, UnitOfWorkContext adapterContext, UnitOfWorkContext optimizerContext)
{
MethodBase methodBase = MethodBase.GetCurrentMethod();
logger.Trace($"Begin {methodBase.ReflectedType.Name}.{methodBase.Name}");
this.dataOptimizerConfiguration = dataOptimizerConfiguration;
this.optimizerDatabaseObjectNames = optimizerDatabaseObjectNames;
this.optimizerEnvironment = optimizerEnvironment;
this.prerequisiteProcessorChecker = prerequisiteProcessorChecker;
this.adapterDatabaseObjectNames = adapterDatabaseObjectNames;
this.exceptionHelper = exceptionHelper;
this.messageLogger = messageLogger;
this.dateTimeHelper = dateTimeHelper;
this.stateMachine = stateMachine;
this.connectionInfoContainer = connectionInfoContainer;
this.processorTracker = processorTracker;
this.dbFaultDataDbFaultDataTEntityMapper = dbFaultDataDbFaultDataTEntityMapper;
this.dbFaultDataEntityPersister = dbFaultDataEntityPersister;
this.dbFaultDataTEntityPersister = dbFaultDataTEntityPersister;
this.dbDeviceTObjectCache = dbDeviceTObjectCache;
this.dbDiagnosticIdTObjectCache = dbDiagnosticIdTObjectCache;
this.dbDiagnosticTObjectCache = dbDiagnosticTObjectCache;
this.dbUserTObjectCache = dbUserTObjectCache;
this.adapterContext = adapterContext;
logger.Debug($"{nameof(UnitOfWorkContext)} [Id: {adapterContext.Id}] associated with {CurrentClassName}.");
this.optimizerContext = optimizerContext;
logger.Debug($"{nameof(UnitOfWorkContext)} [Id: {optimizerContext.Id}] associated with {CurrentClassName}.");
logger.Trace($"End {methodBase.ReflectedType.Name}.{methodBase.Name}");
}
/// <summary>
/// Iteratively executes the business logic until the service is stopped.
/// </summary>
/// <param name="stoppingToken"></param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
MethodBase methodBase = MethodBase.GetCurrentMethod();
logger.Trace($"Begin {methodBase.ReflectedType.Name}.{methodBase.Name}");
while (!stoppingToken.IsCancellationRequested)
{
// If configured to operate on a schedule and the present time is currently outside of an operating window, delay until the next daily start time.
if (dataOptimizerConfiguration.FaultDataProcessorOperationMode == OperationMode.Scheduled)
{
var timeSpanToNextDailyStartTimeUTC = dateTimeHelper.GetTimeSpanToNextDailyStartTimeUTC(dataOptimizerConfiguration.FaultDataProcessorDailyStartTimeUTC, dataOptimizerConfiguration.FaultDataProcessorDailyRunTimeSeconds);
if (timeSpanToNextDailyStartTimeUTC != TimeSpan.Zero)
{
DateTime nextScheduledStartTimeUTC = DateTime.UtcNow.Add(timeSpanToNextDailyStartTimeUTC);
messageLogger.LogScheduledServicePause(CurrentClassName, dataOptimizerConfiguration.FaultDataProcessorDailyStartTimeUTC.TimeOfDay, dataOptimizerConfiguration.FaultDataProcessorDailyRunTimeSeconds, nextScheduledStartTimeUTC);
await Task.Delay(timeSpanToNextDailyStartTimeUTC, stoppingToken);
DateTime nextScheduledPauseTimeUTC = DateTime.UtcNow.Add(TimeSpan.FromSeconds(dataOptimizerConfiguration.FaultDataProcessorDailyRunTimeSeconds));
messageLogger.LogScheduledServiceResumption(CurrentClassName, dataOptimizerConfiguration.FaultDataProcessorDailyStartTimeUTC.TimeOfDay, dataOptimizerConfiguration.FaultDataProcessorDailyRunTimeSeconds, nextScheduledPauseTimeUTC);
}
}
await WaitForPrerequisiteProcessorsIfNeededAsync(stoppingToken);
// Abort if waiting for connectivity restoration.
if (stateMachine.CurrentState == State.Waiting)
{
continue;
}
try
{
logger.Trace($"Started iteration of {methodBase.ReflectedType.Name}.{methodBase.Name}");
using (var cancellationTokenSource = new CancellationTokenSource())
{
var engageExecutionThrottle = true;
DbFaultDatasLastQueriedUtc = DateTime.UtcNow;
// Initialize object caches.
if (dbDeviceTObjectCache.IsInitialized == false)
{
await dbDeviceTObjectCache.InitializeAsync(optimizerContext, Databases.OptimizerDatabase);
}
if (dbDiagnosticTObjectCache.IsInitialized == false)
{
await dbDiagnosticTObjectCache.InitializeAsync(optimizerContext, Databases.OptimizerDatabase);
}
if (dbDiagnosticIdTObjectCache.IsInitialized == false)
{
await dbDiagnosticIdTObjectCache.InitializeAsync(optimizerContext, Databases.OptimizerDatabase);
}
if (dbUserTObjectCache.IsInitialized == false)
{
await dbUserTObjectCache.InitializeAsync(optimizerContext, Databases.OptimizerDatabase);
}
// Get a batch of DbFaultDatas.
IEnumerable<DbFaultData> dbFaultDatas;
string sortColumnName = (string)nameof(DbFaultData.DateTime);
using (var adapterUOW = adapterContext.CreateUnitOfWork(Databases.AdapterDatabase))
{
var dbFaultDataRepo = new DbFaultDataRepository2(adapterContext);
dbFaultDatas = await dbFaultDataRepo.GetAllAsync(cancellationTokenSource, dataOptimizerConfiguration.FaultDataProcessorBatchSize, null, sortColumnName);
}
lastBatchRecordCount = dbFaultDatas.Count();
if (dbFaultDatas.Any())
{
engageExecutionThrottle = lastBatchRecordCount < ThrottleEngagingBatchRecordCount;
// Process the batch of DbFaultDatas.
#nullable enable
long? adapterDbLastId = null;
string? adapterDbLastGeotabId = null;
DateTime? adapterDbLastRecordCreationTimeUtc = null;
#nullable disable
var dbFaultDataTsToPersist = new List<DbFaultDataT>();
foreach (var dbFaultData in dbFaultDatas)
{
var deviceId = await dbDeviceTObjectCache.GetObjectIdAsync(dbFaultData.DeviceId);
var diagnosticIdT = await dbDiagnosticIdTObjectCache.GetObjectAsync(dbFaultData.DiagnosticId);
var diagnosticId = await dbDiagnosticTObjectCache.GetObjectIdAsync(diagnosticIdT.GeotabGUID);
var dismissUserId = await dbUserTObjectCache.GetObjectIdAsync(dbFaultData.DismissUserId);
if (deviceId == null)
{
logger.Warn($"Could not process {nameof(DbFaultData)} '{dbFaultData.id} (GeotabId {dbFaultData.GeotabId})' because a {nameof(DbDeviceT)} with a {nameof(DbDeviceT.GeotabId)} matching the {nameof(DbFaultData.DeviceId)} could not be found.");
continue;
}
if (diagnosticId == null)
{
logger.Warn($"Could not process {nameof(DbFaultData)} '{dbFaultData.id} (GeotabId {dbFaultData.GeotabId})' because a {nameof(DbDiagnosticT)} with a {nameof(DbDiagnosticT.GeotabId)} matching the {nameof(DbFaultData.DiagnosticId)} could not be found.");
continue;
}
var dbFaultDataT = dbFaultDataDbFaultDataTEntityMapper.CreateEntity(dbFaultData, (long)deviceId, (long)diagnosticId, dismissUserId);
dbFaultDataTsToPersist.Add(dbFaultDataT);
dbFaultData.DatabaseWriteOperationType = Common.DatabaseWriteOperationType.Delete;
adapterDbLastId = dbFaultData.id;
adapterDbLastGeotabId = dbFaultData.GeotabId;
adapterDbLastRecordCreationTimeUtc = dbFaultData.RecordCreationTimeUtc;
}
// Persist changes to database using a Unit of Work for each database.
using (var adapterUOW = adapterContext.CreateUnitOfWork(Databases.AdapterDatabase))
{
using (var optimizerUOW = optimizerContext.CreateUnitOfWork(Databases.OptimizerDatabase))
{
try
{
// DbFaultDataT:
await dbFaultDataTEntityPersister.PersistEntitiesToDatabaseAsync(optimizerContext, dbFaultDataTsToPersist, cancellationTokenSource, Logging.LogLevel.Info);
// DbOProcessorTracking:
await processorTracker.UpdateDbOProcessorTrackingRecord(optimizerContext, DataOptimizerProcessor.FaultDataProcessor, DbFaultDatasLastQueriedUtc, adapterDbLastId, adapterDbLastRecordCreationTimeUtc, adapterDbLastGeotabId);
// DbFaultData:
await dbFaultDataEntityPersister.PersistEntitiesToDatabaseAsync(adapterContext, dbFaultDatas, cancellationTokenSource, Logging.LogLevel.Info);
// Commit transactions:
await optimizerUOW.CommitAsync();
await adapterUOW.CommitAsync();
}
catch (Exception)
{
await optimizerUOW.RollBackAsync();
await adapterUOW.RollBackAsync();
throw;
}
}
}
}
else
{
logger.Debug($"No records were returned from the {adapterDatabaseObjectNames.DbFaultDataTableName} table in the {adapterDatabaseObjectNames.AdapterDatabaseNickname} database.");
// Update processor tracking info.
using (var uow = optimizerContext.CreateUnitOfWork(Databases.OptimizerDatabase))
{
try
{
await processorTracker.UpdateDbOProcessorTrackingRecord(optimizerContext, DataOptimizerProcessor.FaultDataProcessor, DbFaultDatasLastQueriedUtc, null, null, null);
await uow.CommitAsync();
}
catch (Exception)
{
await uow.RollBackAsync();
throw;
}
}
}
// If necessary, add a delay to implement the configured execution interval.
if (engageExecutionThrottle == true)
{
var delayTimeSpan = TimeSpan.FromSeconds(dataOptimizerConfiguration.FaultDataProcessorExecutionIntervalSeconds);
logger.Info($"{CurrentClassName} pausing for {delayTimeSpan} because fewer than {ThrottleEngagingBatchRecordCount} records were processed during the current execution interval.");
await Task.Delay(delayTimeSpan, stoppingToken);
}
}
logger.Trace($"Completed iteration of {methodBase.ReflectedType.Name}.{methodBase.Name}");
}
catch (OperationCanceledException)
{
string errorMessage = $"{CurrentClassName} process cancelled.";
logger.Warn(errorMessage);
throw new Exception(errorMessage);
}
catch (AdapterDatabaseConnectionException databaseConnectionException)
{
HandleException(databaseConnectionException, NLogLogLevelName.Error, DefaultErrorMessagePrefix);
}
catch (OptimizerDatabaseConnectionException optimizerDatabaseConnectionException)
{
HandleException(optimizerDatabaseConnectionException, NLogLogLevelName.Error, DefaultErrorMessagePrefix);
}
catch (Exception ex)
{
// If an exception hasn't been handled to this point, log it and kill the process.
HandleException(ex, NLogLogLevelName.Fatal, DefaultErrorMessagePrefix);
}
}
logger.Trace($"End {methodBase.ReflectedType.Name}.{methodBase.Name}");
}
/// <summary>
/// Generates and logs an error message for the supplied <paramref name="exception"/>. If the <paramref name="exception"/> is connectivity-related, the <see cref="stateMachine"/> will have its <see cref="IStateMachine.CurrentState"/> and <see cref="IStateMachine.Reason"/> set accordingly. If the value supplied for <paramref name="logLevel"/> is <see cref="NLogLogLevelName.Fatal"/>, the current process will be killed.
/// </summary>
/// <param name="exception">The <see cref="Exception"/>.</param>
/// <param name="logLevel">The <see cref="LogLevel"/> to be used when logging the error message.</param>
/// <param name="errorMessagePrefix">The start of the error message, which will be followed by the <see cref="Exception.Message"/>, <see cref="Exception.Source"/> and <see cref="Exception.StackTrace"/>.</param>
/// <returns></returns>
void HandleException(Exception exception, NLogLogLevelName logLevel, string errorMessagePrefix)
{
exceptionHelper.LogException(exception, logLevel, errorMessagePrefix);
if (exception is AdapterDatabaseConnectionException)
{
stateMachine.SetState(State.Waiting, StateReason.AdapterDatabaseNotAvailable);
}
else if (exception is OptimizerDatabaseConnectionException)
{
stateMachine.SetState(State.Waiting, StateReason.OptimizerDatabaseNotAvailable);
}
if (logLevel == NLogLogLevelName.Fatal)
{
System.Diagnostics.Process.GetCurrentProcess().Kill();
}
}
/// <summary>
/// Starts the current <see cref="FaultDataProcessor"/> instance.
/// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
/// <returns></returns>
public override async Task StartAsync(CancellationToken cancellationToken)
{
MethodBase methodBase = MethodBase.GetCurrentMethod();
logger.Trace($"Begin {methodBase.ReflectedType.Name}.{methodBase.Name}");
var dbOProcessorTrackings = await processorTracker.GetDbOProcessorTrackingListAsync();
optimizerEnvironment.ValidateOptimizerEnvironment(dbOProcessorTrackings, DataOptimizerProcessor.FaultDataProcessor);
using (var optimizerUOW = optimizerContext.CreateUnitOfWork(Databases.OptimizerDatabase))
{
try
{
await processorTracker.UpdateDbOProcessorTrackingRecord(optimizerContext, DataOptimizerProcessor.FaultDataProcessor, optimizerEnvironment.OptimizerVersion.ToString(), optimizerEnvironment.OptimizerMachineName);
await optimizerUOW.CommitAsync();
}
catch (Exception)
{
await optimizerUOW.RollBackAsync();
throw;
}
}
// Only start this service if it has been configured to be enabled.
if (dataOptimizerConfiguration.EnableFaultDataProcessor == true)
{
logger.Info($"******** STARTING SERVICE: {AssemblyName}.{CurrentClassName} (v{AssemblyVersion})");
await base.StartAsync(cancellationToken);
}
else
{
logger.Warn($"******** WARNING - SERVICE DISABLED: The {AssemblyName}.{CurrentClassName} service has not been enabled and will NOT be started.");
}
}
/// <summary>
/// Stops the current <see cref="FaultDataProcessor"/> instance.
/// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
/// <returns></returns>
public override Task StopAsync(CancellationToken cancellationToken)
{
MethodBase methodBase = MethodBase.GetCurrentMethod();
logger.Trace($"Begin {methodBase.ReflectedType.Name}.{methodBase.Name}");
logger.Info($"******** STOPPED SERVICE: {AssemblyName}.{CurrentClassName} (v{AssemblyVersion}) ********");
return base.StopAsync(cancellationToken);
}
/// <summary>
/// Checks whether any prerequisite processors have been run and are currently running. If any of prerequisite processors have not yet been run or are not currently running, details will be logged and this processor will pause operation, repeating this check intermittently until all prerequisite processors are running.
/// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
/// <returns></returns>
public async Task WaitForPrerequisiteProcessorsIfNeededAsync(CancellationToken cancellationToken)
{
MethodBase methodBase = MethodBase.GetCurrentMethod();
logger.Trace($"Begin {methodBase.ReflectedType.Name}.{methodBase.Name}");
var prerequisiteProcessors = new List<DataOptimizerProcessor>
{
DataOptimizerProcessor.DeviceProcessor,
DataOptimizerProcessor.DiagnosticProcessor,
DataOptimizerProcessor.UserProcessor
};
await prerequisiteProcessorChecker.WaitForPrerequisiteProcessorsIfNeededAsync(CurrentClassName, prerequisiteProcessors, cancellationToken);
logger.Trace($"End {methodBase.ReflectedType.Name}.{methodBase.Name}");
}
}
}