0
0
Files
Extensions.Configuration.En…/RAIC.Extensions.Configuration.EntityFrameworkCore.SqlServer/SqlServerNotificationConfigurationReloader.cs
2026-01-07 11:52:07 +11:00

293 lines
12 KiB
C#

using System.ComponentModel.DataAnnotations;
using System.Data;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Options = RAIC.Extensions.Configuration.EntityFrameworkCore.SqlServer.SqlServerNotificationConfigurationReloaderOptions;
namespace RAIC.Extensions.Configuration.EntityFrameworkCore.SqlServer;
public class SqlServerNotificationConfigurationReloaderOptions // must be public because it appears in public method signatures (various ServiceCollectionExtension methods)
{
[Required]
public required string ConnectionString { get; set; }
[Required]
public required HashSet<int> TransientErrors { get; set; } = [2, 53, 121, 10060, 11001];
}
internal class SqlServerNotificationConfigurationReloader<TDbContext, TSetting> : Microsoft.Extensions.Hosting.BackgroundService
where TDbContext : DbContext, ISettingsDbContext<IQueryable<TSetting>>
where TSetting : class, ISetting
{
private static string? _changesQueryTemplate;
private readonly TDbContext _dbContext;
private readonly IEntityFrameworkCoreDbSetConfigurationProvider _configProvider;
private readonly IDisposable? _onChangeHandler;
private readonly ILogger? _logger;
private Options _options;
public SqlServerNotificationConfigurationReloader(TDbContext dbContext, IEntityFrameworkCoreDbSetConfigurationProvider configProvider, IOptionsMonitor<Options> options, ILogger<SqlServerNotificationConfigurationReloader<TDbContext, TSetting>>? logger = null)
{
_dbContext = dbContext;
_configProvider = configProvider;
_options = options.CurrentValue;
_onChangeHandler = options.OnChange(opt => _options = opt);
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
var settingsQuery = await Initialise(stoppingToken);
do
{
try
{
do
{
var notificationCompletion = await ListenForNotifications(settingsQuery, stoppingToken);
using var tcsCancelRegistration = stoppingToken.Register(() => notificationCompletion.TrySetCanceled());
var lastVersion = await GetChangeTrackingVersion(stoppingToken);
await notificationCompletion.Task;
UpdateConfiguration(lastVersion);
}
while (true); // each notification is one-and-done, so must keep re-regesitering for notifications indefinitely
}
catch (SqlException e) when (e.InnerException is SqlException inner && _options.TransientErrors.Overlaps(inner.Errors.OfType<SqlError>().Select(e => e.Number)))
{
_logger?.LogWarning(e, "Transient exception during notification setup process or the processing of notifications");
}
catch (SqlNotificationException e) // only ever thown while waiting for notification task to complete, must (?) be dropped connection of some sort
{
_logger?.LogWarning(e, "Transient exception while listening for notifications");
}
catch (Exception e) when (e is not OperationCanceledException)
{
_logger?.LogError(e, "Exception while setting up, listening for or processing notifications");
throw;
}
}
while (await TryReconnect(stoppingToken));
_logger?.LogWarning("Giving up listening for notifications on query '{query}' because reconnection failed. Configuration updates from database will no longer occur", settingsQuery);
SqlDependency.Stop(_options.ConnectionString);
}
catch (OperationCanceledException e)
{
_logger?.LogInformation(e, "Exiting due to signal from stopping token");
}
}
private async Task<string> Initialise(CancellationToken stoppingToken)
{
var openTask = _dbContext.Database.OpenConnectionAsync(stoppingToken);
try
{
SqlDependency.Start(_options.ConnectionString);
}
catch (Exception e)
{
_logger?.LogError(e, "Exception while attempting to start dependency change listener");
throw;
}
var settingsQuery = _dbContext.Settings.ToQueryString();
_logger?.LogInformation(@"Query upon which notifications will be listened for: ""{query}""", settingsQuery);
await openTask;
return settingsQuery;
}
private async Task<TaskCompletionSource> ListenForNotifications(string settingsQuery, CancellationToken stoppingToken)
{
// Can't reuse the command built here, as once it gets associated with a SqlDependency it can't be used with another,
// and SqlDependencies can't be used more than once either... hence there's no point "preparing" the command either
using var command = new SqlCommand()
{
Connection = (SqlConnection)_dbContext.Database.GetDbConnection(),
CommandType = CommandType.Text,
CommandText = settingsQuery,
};
var dependency = new SqlDependency(command);
var tcs = new TaskCompletionSource();
dependency.OnChange += OnChange;
try
{
await command.ExecuteNonQueryAsync(stoppingToken);
}
catch (Exception e) when (e is not OperationCanceledException)
{
_logger?.LogError(e, "Exception while attempting to register query dependency");
throw;
}
return tcs;
void OnChange(object sender, SqlNotificationEventArgs args)
{
dependency.OnChange -= OnChange;
switch (args.Info)
{
case SqlNotificationInfo.Insert:
case SqlNotificationInfo.Update:
case SqlNotificationInfo.Delete:
case SqlNotificationInfo.Expired:
case SqlNotificationInfo.Truncate:
tcs.TrySetResult();
return;
case SqlNotificationInfo.Error:
_logger?.LogWarning("SqlDependency '{info}' from {type}@{source}", args.Info, args.Type, args.Source);
tcs.TrySetException(new SqlNotificationException($"SqlDependency {args.Info} from {args.Type}@{args.Source}"));
return;
default:
_logger?.LogWarning("Ignoring '{info}' from {type}@{source} received from SqlDependency", args.Info, args.Type, args.Source);
return;
}
}
}
private async Task<long> GetChangeTrackingVersion(CancellationToken stoppingToken)
{
var query = "SELECT CHANGE_TRACKING_CURRENT_VERSION() as value";
return await _dbContext.Database.SqlQueryRaw<long>(query).SingleAsync(stoppingToken);
}
private void UpdateConfiguration(long lastVersion)
{
var changed = false;
foreach (var (key, value) in GetChanges(lastVersion))
{
if (value is null)
{
changed = true;
_configProvider.Remove(key);
}
else if (!_configProvider.TryGet(key, out var oldValue) || !string.Equals(oldValue, value))
{
changed = true;
_configProvider.Set(key, value);
}
}
if (changed) _configProvider.OnReload();
}
private IQueryable<(string key, string? value)> GetChanges(long lastVersion)
{
_changesQueryTemplate ??= GetQueryTemplate();
return _dbContext.Database.SqlQueryRaw<TSetting>(_changesQueryTemplate, lastVersion)
.Select(s => ValueTuple.Create(s.Key, string.Equals(s.Value, null) ? null : s.Value)); // hack to get EF to not throw if Value is null
}
private string GetQueryTemplate()
{
var entityType = _dbContext.Model.FindEntityType(typeof(TSetting))!;
var schemaQualifiedTableName = $"[{entityType.GetSchema()}].[{entityType.GetTableName()}]";
var keyColumn = $"[{entityType.FindPrimaryKey()!.Properties.Single().GetColumnName()}]";
var valueColumn = $"[{entityType.FindProperty(nameof(ISetting.Value))!.GetColumnName()}]";
var changesQueryTemplate = $$"""
SELECT ct.{{keyColumn}}, s.{{valueColumn}}
FROM CHANGETABLE( CHANGES {{schemaQualifiedTableName}}, {0} ) AS ct
LEFT JOIN {{schemaQualifiedTableName}} AS s ON ct.{{keyColumn}} = s.{{keyColumn}}
""";
return changesQueryTemplate;
}
private async Task<bool> TryReconnect(CancellationToken stoppingToken)
{
// Unlike the postgres version of this there's no retry logic here - SqlClient does its own retries internally so it is not needed (?)
if (await _dbContext.Database.CanConnectAsync(stoppingToken))
{
await _dbContext.Database.OpenConnectionAsync(stoppingToken);
return true;
}
return false;
}
/*
Query here is throwing exception for some reason. Think it is getting confused about key column being aliased as value perhaps?
From the logs:
warn: RelationalEventId.QueryPossibleUnintendedUseOfEqualsWarning[20501] (Microsoft.EntityFrameworkCore.Query)
Possible unintended use of method 'Equals' for arguments 's.Value' and 's0.Key' of different types in a query.This comparison will always return false.
...
Generated query execution expression:
'queryContext => SingleQueryingEnumerable.Create<ValueTuple<string, string>>(
relationalQueryContext: (RelationalQueryContext)queryContext,
relationalCommandResolver: parameters => [LIFTABLE Constant: RelationalCommandCache.QueryExpression(
Client Projections:
0 -> 0
1 -> 1
SELECT s.Value, CASE
WHEN s0.Value == NULL THEN NULL
ELSE s0.Value
END
FROM SELECT [Key] AS Value FROM CHANGETABLE( CHANGES [dbo].[Settings], {0} )
LEFT JOIN dbo.Settings AS s0 ON CAST(0 AS bit)) | Resolver: c => new RelationalCommandCache(
What's up with that JOIN .. ON clause?!
*/
private IQueryable<(string key, string? value)> GetChangesNotWorking(long lastVersion)
{
var entityType = _dbContext.Model.FindEntityType(typeof(TSetting))!;
var schemaQualifiedTableName = $"[{entityType.GetSchema()}].[{entityType.GetTableName()}]";
var keyColumn = $"[{entityType.FindPrimaryKey()!.Properties.Single().GetColumnName()}]";
var changesQueryTemplate = $"SELECT {keyColumn} AS Value FROM CHANGETABLE( CHANGES {schemaQualifiedTableName}, {{0}} )";
var query = from changedKey in _dbContext.Database.SqlQueryRaw<string>(changesQueryTemplate, lastVersion)
join s in _dbContext.Settings
on changedKey equals s.Key into joined
from j in joined.DefaultIfEmpty()
select ValueTuple.Create(changedKey, string.Equals(j.Value, null) ? null : j.Value);
return query;
}
public override void Dispose()
{
base.Dispose();
_onChangeHandler?.Dispose();
}
private class SqlNotificationException : Exception
{
public SqlNotificationException(string? message) : base(message)
{
}
public SqlNotificationException(string? message, Exception? innerException) : base(message, innerException)
{
}
}
}