using System.ComponentModel.DataAnnotations; using System.Data; using Microsoft.Data.SqlClient; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Options = RAIC.Extensions.Configuration.EntityFrameworkCore.SqlServer.SqlServerNotificationConfigurationReloaderOptions; namespace RAIC.Extensions.Configuration.EntityFrameworkCore.SqlServer; public class SqlServerNotificationConfigurationReloaderOptions // must be public because it appears in public method signatures (various ServiceCollectionExtension methods) { [Required] public required string ConnectionString { get; set; } [Required] public required HashSet TransientErrors { get; set; } = [2, 53, 121, 10060, 11001]; } internal class SqlServerNotificationConfigurationReloader : Microsoft.Extensions.Hosting.BackgroundService where TDbContext : DbContext, ISettingsDbContext> where TSetting : class, ISetting { private static string? _changesQueryTemplate; private readonly TDbContext _dbContext; private readonly IEntityFrameworkCoreDbSetConfigurationProvider _configProvider; private readonly IDisposable? _onChangeHandler; private readonly ILogger? _logger; private Options _options; public SqlServerNotificationConfigurationReloader(TDbContext dbContext, IEntityFrameworkCoreDbSetConfigurationProvider configProvider, IOptionsMonitor options, ILogger>? logger = null) { _dbContext = dbContext; _configProvider = configProvider; _options = options.CurrentValue; _onChangeHandler = options.OnChange(opt => _options = opt); _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { var settingsQuery = await Initialise(stoppingToken); do { try { do { var notificationCompletion = await ListenForNotifications(settingsQuery, stoppingToken); using var tcsCancelRegistration = stoppingToken.Register(() => notificationCompletion.TrySetCanceled()); var lastVersion = await GetChangeTrackingVersion(stoppingToken); await notificationCompletion.Task; UpdateConfiguration(lastVersion); } while (true); // each notification is one-and-done, so must keep re-regesitering for notifications indefinitely } catch (SqlException e) when (e.InnerException is SqlException inner && _options.TransientErrors.Overlaps(inner.Errors.OfType().Select(e => e.Number))) { _logger?.LogWarning(e, "Transient exception during notification setup process or the processing of notifications"); } catch (SqlNotificationException e) // only ever thown while waiting for notification task to complete, must (?) be dropped connection of some sort { _logger?.LogWarning(e, "Transient exception while listening for notifications"); } catch (Exception e) when (e is not OperationCanceledException) { _logger?.LogError(e, "Exception while setting up, listening for or processing notifications"); throw; } } while (await TryReconnect(stoppingToken)); _logger?.LogWarning("Giving up listening for notifications on query '{query}' because reconnection failed. Configuration updates from database will no longer occur", settingsQuery); SqlDependency.Stop(_options.ConnectionString); } catch (OperationCanceledException e) { _logger?.LogInformation(e, "Exiting due to signal from stopping token"); } } private async Task Initialise(CancellationToken stoppingToken) { var openTask = _dbContext.Database.OpenConnectionAsync(stoppingToken); try { SqlDependency.Start(_options.ConnectionString); } catch (Exception e) { _logger?.LogError(e, "Exception while attempting to start dependency change listener"); throw; } var settingsQuery = _dbContext.Settings.ToQueryString(); _logger?.LogInformation(@"Query upon which notifications will be listened for: ""{query}""", settingsQuery); await openTask; return settingsQuery; } private async Task ListenForNotifications(string settingsQuery, CancellationToken stoppingToken) { // Can't reuse the command built here, as once it gets associated with a SqlDependency it can't be used with another, // and SqlDependencies can't be used more than once either... hence there's no point "preparing" the command either using var command = new SqlCommand() { Connection = (SqlConnection)_dbContext.Database.GetDbConnection(), CommandType = CommandType.Text, CommandText = settingsQuery, }; var dependency = new SqlDependency(command); var tcs = new TaskCompletionSource(); dependency.OnChange += OnChange; try { await command.ExecuteNonQueryAsync(stoppingToken); } catch (Exception e) when (e is not OperationCanceledException) { _logger?.LogError(e, "Exception while attempting to register query dependency"); throw; } return tcs; void OnChange(object sender, SqlNotificationEventArgs args) { dependency.OnChange -= OnChange; switch (args.Info) { case SqlNotificationInfo.Insert: case SqlNotificationInfo.Update: case SqlNotificationInfo.Delete: case SqlNotificationInfo.Expired: case SqlNotificationInfo.Truncate: tcs.TrySetResult(); return; case SqlNotificationInfo.Error: _logger?.LogWarning("SqlDependency '{info}' from {type}@{source}", args.Info, args.Type, args.Source); tcs.TrySetException(new SqlNotificationException($"SqlDependency {args.Info} from {args.Type}@{args.Source}")); return; default: _logger?.LogWarning("Ignoring '{info}' from {type}@{source} received from SqlDependency", args.Info, args.Type, args.Source); return; } } } private async Task GetChangeTrackingVersion(CancellationToken stoppingToken) { var query = "SELECT CHANGE_TRACKING_CURRENT_VERSION() as value"; return await _dbContext.Database.SqlQueryRaw(query).SingleAsync(stoppingToken); } private void UpdateConfiguration(long lastVersion) { var changed = false; foreach (var (key, value) in GetChanges(lastVersion)) { if (value is null) { changed = true; _configProvider.Remove(key); } else if (!_configProvider.TryGet(key, out var oldValue) || !string.Equals(oldValue, value)) { changed = true; _configProvider.Set(key, value); } } if (changed) _configProvider.OnReload(); } private IQueryable<(string key, string? value)> GetChanges(long lastVersion) { _changesQueryTemplate ??= GetQueryTemplate(); return _dbContext.Database.SqlQueryRaw(_changesQueryTemplate, lastVersion) .Select(s => ValueTuple.Create(s.Key, string.Equals(s.Value, null) ? null : s.Value)); // hack to get EF to not throw if Value is null } private string GetQueryTemplate() { var entityType = _dbContext.Model.FindEntityType(typeof(TSetting))!; var schemaQualifiedTableName = $"[{entityType.GetSchema()}].[{entityType.GetTableName()}]"; var keyColumn = $"[{entityType.FindPrimaryKey()!.Properties.Single().GetColumnName()}]"; var valueColumn = $"[{entityType.FindProperty(nameof(ISetting.Value))!.GetColumnName()}]"; var changesQueryTemplate = $$""" SELECT ct.{{keyColumn}}, s.{{valueColumn}} FROM CHANGETABLE( CHANGES {{schemaQualifiedTableName}}, {0} ) AS ct LEFT JOIN {{schemaQualifiedTableName}} AS s ON ct.{{keyColumn}} = s.{{keyColumn}} """; return changesQueryTemplate; } private async Task TryReconnect(CancellationToken stoppingToken) { // Unlike the postgres version of this there's no retry logic here - SqlClient does its own retries internally so it is not needed (?) if (await _dbContext.Database.CanConnectAsync(stoppingToken)) { await _dbContext.Database.OpenConnectionAsync(stoppingToken); return true; } return false; } /* Query here is throwing exception for some reason. Think it is getting confused about key column being aliased as value perhaps? From the logs: warn: RelationalEventId.QueryPossibleUnintendedUseOfEqualsWarning[20501] (Microsoft.EntityFrameworkCore.Query) Possible unintended use of method 'Equals' for arguments 's.Value' and 's0.Key' of different types in a query.This comparison will always return false. ... Generated query execution expression: 'queryContext => SingleQueryingEnumerable.Create>( relationalQueryContext: (RelationalQueryContext)queryContext, relationalCommandResolver: parameters => [LIFTABLE Constant: RelationalCommandCache.QueryExpression( Client Projections: 0 -> 0 1 -> 1 SELECT s.Value, CASE WHEN s0.Value == NULL THEN NULL ELSE s0.Value END FROM SELECT [Key] AS Value FROM CHANGETABLE( CHANGES [dbo].[Settings], {0} ) LEFT JOIN dbo.Settings AS s0 ON CAST(0 AS bit)) | Resolver: c => new RelationalCommandCache( What's up with that JOIN .. ON clause?! */ private IQueryable<(string key, string? value)> GetChangesNotWorking(long lastVersion) { var entityType = _dbContext.Model.FindEntityType(typeof(TSetting))!; var schemaQualifiedTableName = $"[{entityType.GetSchema()}].[{entityType.GetTableName()}]"; var keyColumn = $"[{entityType.FindPrimaryKey()!.Properties.Single().GetColumnName()}]"; var changesQueryTemplate = $"SELECT {keyColumn} AS Value FROM CHANGETABLE( CHANGES {schemaQualifiedTableName}, {{0}} )"; var query = from changedKey in _dbContext.Database.SqlQueryRaw(changesQueryTemplate, lastVersion) join s in _dbContext.Settings on changedKey equals s.Key into joined from j in joined.DefaultIfEmpty() select ValueTuple.Create(changedKey, string.Equals(j.Value, null) ? null : j.Value); return query; } public override void Dispose() { base.Dispose(); _onChangeHandler?.Dispose(); } private class SqlNotificationException : Exception { public SqlNotificationException(string? message) : base(message) { } public SqlNotificationException(string? message, Exception? innerException) : base(message, innerException) { } } }