forked from Geotab/mygeotab-api-adapter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathUserProcessor.cs
351 lines (317 loc) · 20 KB
/
UserProcessor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
using Microsoft.Extensions.Hosting;
using MyGeotabAPIAdapter.Configuration;
using MyGeotabAPIAdapter.Database;
using MyGeotabAPIAdapter.Database.Caches;
using MyGeotabAPIAdapter.Database.DataAccess;
using MyGeotabAPIAdapter.Database.EntityMappers;
using MyGeotabAPIAdapter.Database.EntityPersisters;
using MyGeotabAPIAdapter.Database.Models;
using MyGeotabAPIAdapter.Exceptions;
using MyGeotabAPIAdapter.Helpers;
using MyGeotabAPIAdapter.Logging;
using NLog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
namespace MyGeotabAPIAdapter.DataOptimizer
{
/// <summary>
/// A <see cref="BackgroundService"/> that handles ETL processing of User data from the Adapter database to the Optimizer database.
/// </summary>
class UserProcessor : BackgroundService
{
string AssemblyName { get => GetType().Assembly.GetName().Name; }
string AssemblyVersion { get => GetType().Assembly.GetName().Version.ToString(); }
static string CurrentClassName { get => nameof(UserProcessor); }
static string DefaultErrorMessagePrefix { get => $"{CurrentClassName} process caught an exception"; }
static int ThrottleEngagingBatchRecordCount { get => 1; }
readonly IAdapterDatabaseObjectNames adapterDatabaseObjectNames;
readonly IConnectionInfoContainer connectionInfoContainer;
readonly IDataOptimizerConfiguration dataOptimizerConfiguration;
readonly IDateTimeHelper dateTimeHelper;
readonly IDbUserDbUserTEntityMapper dbUserDbUserTEntityMapper;
readonly IGenericEntityPersister<DbUserT> dbUserTEntityPersister;
readonly IGenericDbObjectCache<DbUser> dbUserObjectCache;
readonly IGenericDbObjectCache<DbUserT> dbUserTObjectCache;
readonly IExceptionHelper exceptionHelper;
readonly IMessageLogger messageLogger;
readonly IOptimizerDatabaseObjectNames optimizerDatabaseObjectNames;
readonly IOptimizerEnvironment optimizerEnvironment;
readonly IProcessorTracker processorTracker;
readonly IStateMachine stateMachine;
readonly Logger logger = LogManager.GetCurrentClassLogger();
readonly UnitOfWorkContext adapterContext;
readonly UnitOfWorkContext optimizerContext;
/// <summary>
/// The last time a call was initiated to retrieve records from the DbUsers table in the Adapter database.
/// </summary>
DateTime DbUsersLastQueriedUtc { get; set; }
/// <summary>
/// Initializes a new instance of the <see cref="UserProcessor"/> class.
/// </summary>
public UserProcessor(IDataOptimizerConfiguration dataOptimizerConfiguration, IOptimizerDatabaseObjectNames optimizerDatabaseObjectNames, IAdapterDatabaseObjectNames adapterDatabaseObjectNames, IDateTimeHelper dateTimeHelper, IExceptionHelper exceptionHelper, IMessageLogger messageLogger, IOptimizerEnvironment optimizerEnvironment, IStateMachine stateMachine, IConnectionInfoContainer connectionInfoContainer, IProcessorTracker processorTracker, IDbUserDbUserTEntityMapper dbUserDbUserTEntityMapper, IGenericEntityPersister<DbUserT> dbUserTEntityPersister, IGenericDbObjectCache<DbUser> dbUserObjectCache, IGenericDbObjectCache<DbUserT> dbUserTObjectCache, UnitOfWorkContext adapterContext, UnitOfWorkContext optimizerContext)
{
MethodBase methodBase = MethodBase.GetCurrentMethod();
logger.Trace($"Begin {methodBase.ReflectedType.Name}.{methodBase.Name}");
this.dataOptimizerConfiguration = dataOptimizerConfiguration;
this.optimizerDatabaseObjectNames = optimizerDatabaseObjectNames;
this.adapterDatabaseObjectNames = adapterDatabaseObjectNames;
this.exceptionHelper = exceptionHelper;
this.messageLogger = messageLogger;
this.optimizerEnvironment = optimizerEnvironment;
this.dateTimeHelper = dateTimeHelper;
this.stateMachine = stateMachine;
this.connectionInfoContainer = connectionInfoContainer;
this.processorTracker = processorTracker;
this.dbUserDbUserTEntityMapper = dbUserDbUserTEntityMapper;
this.dbUserTEntityPersister = dbUserTEntityPersister;
this.dbUserObjectCache = dbUserObjectCache;
this.dbUserTObjectCache = dbUserTObjectCache;
this.adapterContext = adapterContext;
logger.Debug($"{nameof(UnitOfWorkContext)} [Id: {adapterContext.Id}] associated with {CurrentClassName}.");
this.optimizerContext = optimizerContext;
logger.Debug($"{nameof(UnitOfWorkContext)} [Id: {optimizerContext.Id}] associated with {CurrentClassName}.");
logger.Trace($"End {methodBase.ReflectedType.Name}.{methodBase.Name}");
}
/// <summary>
/// Iteratively executes the business logic until the service is stopped.
/// </summary>
/// <param name="stoppingToken"></param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
MethodBase methodBase = MethodBase.GetCurrentMethod();
logger.Trace($"Begin {methodBase.ReflectedType.Name}.{methodBase.Name}");
while (!stoppingToken.IsCancellationRequested)
{
// If configured to operate on a schedule and the present time is currently outside of an operating window, delay until the next daily start time.
if (dataOptimizerConfiguration.UserProcessorOperationMode == OperationMode.Scheduled)
{
var timeSpanToNextDailyStartTimeUTC = dateTimeHelper.GetTimeSpanToNextDailyStartTimeUTC(dataOptimizerConfiguration.UserProcessorDailyStartTimeUTC, dataOptimizerConfiguration.UserProcessorDailyRunTimeSeconds);
if (timeSpanToNextDailyStartTimeUTC != TimeSpan.Zero)
{
DateTime nextScheduledStartTimeUTC = DateTime.UtcNow.Add(timeSpanToNextDailyStartTimeUTC);
messageLogger.LogScheduledServicePause(CurrentClassName, dataOptimizerConfiguration.UserProcessorDailyStartTimeUTC.TimeOfDay, dataOptimizerConfiguration.UserProcessorDailyRunTimeSeconds, nextScheduledStartTimeUTC);
await Task.Delay(timeSpanToNextDailyStartTimeUTC, stoppingToken);
DateTime nextScheduledPauseTimeUTC = DateTime.UtcNow.Add(TimeSpan.FromSeconds(dataOptimizerConfiguration.UserProcessorDailyRunTimeSeconds));
messageLogger.LogScheduledServiceResumption(CurrentClassName, dataOptimizerConfiguration.UserProcessorDailyStartTimeUTC.TimeOfDay, dataOptimizerConfiguration.UserProcessorDailyRunTimeSeconds, nextScheduledPauseTimeUTC);
}
}
// Abort if waiting for connectivity restoration.
if (stateMachine.CurrentState == State.Waiting)
{
continue;
}
// Abort if the configured execution interval has not elapsed since the last time this method was executed.
var userProcessorInfo = await processorTracker.GetUserProcessorInfoAsync();
if (userProcessorInfo.EntitiesHaveBeenProcessed && !dateTimeHelper.TimeIntervalHasElapsed((DateTime)userProcessorInfo.EntitiesLastProcessedUtc, DateTimeIntervalType.Seconds, dataOptimizerConfiguration.UserProcessorExecutionIntervalSeconds))
{
continue;
}
try
{
logger.Trace($"Started iteration of {methodBase.ReflectedType.Name}.{methodBase.Name}");
using (var cancellationTokenSource = new CancellationTokenSource())
{
var engageExecutionThrottle = true;
var processorTrackingInfoUpdated = false;
DbUsersLastQueriedUtc = DateTime.UtcNow;
// Initialize object caches.
if (dbUserObjectCache.IsInitialized == false)
{
await dbUserObjectCache.InitializeAsync(adapterContext, Databases.AdapterDatabase);
}
if (dbUserTObjectCache.IsInitialized == false)
{
await dbUserTObjectCache.InitializeAsync(optimizerContext, Databases.OptimizerDatabase);
}
if (dbUserObjectCache.Any())
{
#nullable enable
long? adapterDbLastId = null;
string? adapterDbLastGeotabId = null;
DateTime? adapterDbLastRecordCreationTimeUtc = null;
#nullable disable
// Get the subset of DbUsers that were added or changed since the last time DbUsers were processed.
var changedSince = (DateTime)userProcessorInfo.EntitiesLastProcessedUtc;
var changedDbUsers = await dbUserObjectCache.GetObjectsAsync(changedSince);
if (changedDbUsers.Any())
{
engageExecutionThrottle = changedDbUsers.Count < ThrottleEngagingBatchRecordCount;
var dbUserTsToPersist = new List<DbUserT>();
// Iterate through the list of added/changed DbUsers.
foreach (var changedDbUser in changedDbUsers)
{
// Try to get the DbUserT that corresponds with the DbUser.
var dbUserT = await dbUserTObjectCache.GetObjectAsync(changedDbUser.GeotabId);
if (dbUserT == null)
{
// The DbUserT doesn't yet exist. Create a new one.
dbUserT = dbUserDbUserTEntityMapper.CreateEntity(changedDbUser);
dbUserTsToPersist.Add(dbUserT);
adapterDbLastId = changedDbUser.id;
adapterDbLastGeotabId = changedDbUser.GeotabId;
adapterDbLastRecordCreationTimeUtc = changedDbUser.RecordLastChangedUtc;
}
else
{
// Update the existing DbUserT.
dbUserDbUserTEntityMapper.UpdateEntity(dbUserT, changedDbUser);
dbUserTsToPersist.Add(dbUserT);
}
}
// Persist changes to database.
using (var optimizerUOW = optimizerContext.CreateUnitOfWork(Databases.OptimizerDatabase))
{
try
{
// DbUserT:
await dbUserTEntityPersister.PersistEntitiesToDatabaseAsync(optimizerContext, dbUserTsToPersist, cancellationTokenSource, Logging.LogLevel.Info);
// DbOProcessorTracking:
await processorTracker.UpdateDbOProcessorTrackingRecord(optimizerContext, DataOptimizerProcessor.UserProcessor, DbUsersLastQueriedUtc, adapterDbLastId, adapterDbLastRecordCreationTimeUtc, adapterDbLastGeotabId);
// Commit transactions:
await optimizerUOW.CommitAsync();
processorTrackingInfoUpdated = true;
}
catch (Exception)
{
await optimizerUOW.RollBackAsync();
throw;
}
}
// Force the DbUserT cache to be updated so that the changes are immediately available to other consumers.
await dbUserTObjectCache.UpdateAsync(true);
}
else
{
logger.Debug($"There are no new or changed records in the {adapterDatabaseObjectNames.DbUserTableName} table in the {adapterDatabaseObjectNames.AdapterDatabaseNickname} database since the last check.");
}
}
else
{
logger.Debug($"No records were returned from the {adapterDatabaseObjectNames.DbUserTableName} table in the {adapterDatabaseObjectNames.AdapterDatabaseNickname} database.");
}
// Update processor tracking info if not already done.
if (processorTrackingInfoUpdated == false)
{
using (var optimizerUOW = optimizerContext.CreateUnitOfWork(Databases.OptimizerDatabase))
{
try
{
await processorTracker.UpdateDbOProcessorTrackingRecord(optimizerContext, DataOptimizerProcessor.UserProcessor, DbUsersLastQueriedUtc, null, null, null);
await optimizerUOW.CommitAsync();
}
catch (Exception)
{
await optimizerUOW.RollBackAsync();
throw;
}
}
}
// If necessary, add a delay to implement the configured execution interval.
if (engageExecutionThrottle == true)
{
var delayTimeSpan = TimeSpan.FromSeconds(dataOptimizerConfiguration.UserProcessorExecutionIntervalSeconds);
logger.Info($"{CurrentClassName} pausing for {delayTimeSpan} because fewer than {ThrottleEngagingBatchRecordCount} records were processed during the current execution interval.");
await Task.Delay(delayTimeSpan, stoppingToken);
}
}
logger.Trace($"Completed iteration of {methodBase.ReflectedType.Name}.{methodBase.Name}");
}
catch (OperationCanceledException)
{
string errorMessage = $"{CurrentClassName} process cancelled.";
logger.Warn(errorMessage);
throw new Exception(errorMessage);
}
catch (AdapterDatabaseConnectionException databaseConnectionException)
{
HandleException(databaseConnectionException, NLogLogLevelName.Error, DefaultErrorMessagePrefix);
}
catch (OptimizerDatabaseConnectionException optimizerDatabaseConnectionException)
{
HandleException(optimizerDatabaseConnectionException, NLogLogLevelName.Error, DefaultErrorMessagePrefix);
}
catch (Exception ex)
{
// If an exception hasn't been handled to this point, log it and kill the process.
HandleException(ex, NLogLogLevelName.Fatal, DefaultErrorMessagePrefix);
}
}
logger.Trace($"End {methodBase.ReflectedType.Name}.{methodBase.Name}");
}
/// <summary>
/// Generates and logs an error message for the supplied <paramref name="exception"/>. If the <paramref name="exception"/> is connectivity-related, the <see cref="stateMachine"/> will have its <see cref="IStateMachine.CurrentState"/> and <see cref="IStateMachine.Reason"/> set accordingly. If the value supplied for <paramref name="logLevel"/> is <see cref="NLogLogLevelName.Fatal"/>, the current process will be killed.
/// </summary>
/// <param name="exception">The <see cref="Exception"/>.</param>
/// <param name="logLevel">The <see cref="LogLevel"/> to be used when logging the error message.</param>
/// <param name="errorMessagePrefix">The start of the error message, which will be followed by the <see cref="Exception.Message"/>, <see cref="Exception.Source"/> and <see cref="Exception.StackTrace"/>.</param>
/// <returns></returns>
void HandleException(Exception exception, NLogLogLevelName logLevel, string errorMessagePrefix)
{
exceptionHelper.LogException(exception, logLevel, errorMessagePrefix);
if (exception is AdapterDatabaseConnectionException)
{
stateMachine.SetState(State.Waiting, StateReason.AdapterDatabaseNotAvailable);
}
else if (exception is OptimizerDatabaseConnectionException)
{
stateMachine.SetState(State.Waiting, StateReason.OptimizerDatabaseNotAvailable);
}
if (logLevel == NLogLogLevelName.Fatal)
{
System.Diagnostics.Process.GetCurrentProcess().Kill();
}
}
/// <summary>
/// Starts the current <see cref="UserProcessor"/> instance.
/// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
/// <returns></returns>
public override async Task StartAsync(CancellationToken cancellationToken)
{
MethodBase methodBase = MethodBase.GetCurrentMethod();
logger.Trace($"Begin {methodBase.ReflectedType.Name}.{methodBase.Name}");
var dbOProcessorTrackings = await processorTracker.GetDbOProcessorTrackingListAsync();
optimizerEnvironment.ValidateOptimizerEnvironment(dbOProcessorTrackings, DataOptimizerProcessor.UserProcessor);
using (var optimizerUOW = optimizerContext.CreateUnitOfWork(Databases.OptimizerDatabase))
{
try
{
await processorTracker.UpdateDbOProcessorTrackingRecord(optimizerContext, DataOptimizerProcessor.UserProcessor, optimizerEnvironment.OptimizerVersion.ToString(), optimizerEnvironment.OptimizerMachineName);
await optimizerUOW.CommitAsync();
}
catch (Exception)
{
await optimizerUOW.RollBackAsync();
throw;
}
}
// Only start this service if it has been configured to be enabled.
if (dataOptimizerConfiguration.EnableUserProcessor == true)
{
logger.Info($"******** STARTING SERVICE: {AssemblyName}.{CurrentClassName} (v{AssemblyVersion})");
await base.StartAsync(cancellationToken);
}
else
{
logger.Warn($"******** WARNING - SERVICE DISABLED: The {AssemblyName}.{CurrentClassName} service has not been enabled and will NOT be started.");
}
}
/// <summary>
/// Stops the current <see cref="UserProcessor"/> instance.
/// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
/// <returns></returns>
public override Task StopAsync(CancellationToken cancellationToken)
{
MethodBase methodBase = MethodBase.GetCurrentMethod();
logger.Trace($"Begin {methodBase.ReflectedType.Name}.{methodBase.Name}");
logger.Info($"******** STOPPED SERVICE: {AssemblyName}.{CurrentClassName} (v{AssemblyVersion}) ********");
return base.StopAsync(cancellationToken);
}
}
}