You've already forked Extensions.Configuration.EntityFrameworkCore
266 lines
10 KiB
C#
266 lines
10 KiB
C#
using System.ComponentModel.DataAnnotations;
|
|
using System.Data;
|
|
using Microsoft.Data.SqlClient;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using Options = RAIC.Extensions.Configuration.EntityFrameworkCore.SqlServer.SqlServerNotificationConfigurationReloaderOptions;
|
|
|
|
namespace RAIC.Extensions.Configuration.EntityFrameworkCore.SqlServer;
|
|
|
|
public class SqlServerNotificationConfigurationReloaderOptions // must be public because it appears in public method signatures (various ServiceCollectionExtension methods)
|
|
{
|
|
[Required]
|
|
public required string ConnectionString { get; set; }
|
|
}
|
|
|
|
|
|
internal class SqlServerNotificationConfigurationReloader<TDbContext, TSetting> : Microsoft.Extensions.Hosting.BackgroundService
|
|
where TDbContext : DbContext, ISettingsDbContext<IQueryable<TSetting>>
|
|
where TSetting : class, ISetting
|
|
{
|
|
private static string? _changesQueryTemplate;
|
|
|
|
private readonly TDbContext _dbContext;
|
|
private readonly IEntityFrameworkCoreDbSetConfigurationProvider _configProvider;
|
|
private readonly IDisposable? _onChangeHandler;
|
|
private readonly ILogger? _logger;
|
|
|
|
private Options _options;
|
|
|
|
public SqlServerNotificationConfigurationReloader(TDbContext dbContext, IEntityFrameworkCoreDbSetConfigurationProvider configProvider, IOptionsMonitor<Options> options, ILogger<SqlServerNotificationConfigurationReloader<TDbContext, TSetting>>? logger = null)
|
|
{
|
|
_dbContext = dbContext;
|
|
_configProvider = configProvider;
|
|
_options = options.CurrentValue;
|
|
_onChangeHandler = options.OnChange(opt => _options = opt);
|
|
_logger = logger;
|
|
}
|
|
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
var settingsQuery = await Initialise(stoppingToken);
|
|
|
|
do
|
|
{
|
|
do
|
|
{
|
|
var (taskCompletionSource, cancelTokenRegistration) = await ListenForNotifications(settingsQuery, stoppingToken);
|
|
using (cancelTokenRegistration)
|
|
{
|
|
var lastVersion = await GetChangeTrackingVersion(stoppingToken);
|
|
|
|
try
|
|
{
|
|
await taskCompletionSource.Task;
|
|
UpdateConfiguration(lastVersion);
|
|
}
|
|
catch (Exception e) when (e is not TaskCanceledException)
|
|
{
|
|
_logger?.LogWarning(e, "Exception while listening for notifications on query '{query}'", settingsQuery);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
while (true); // each notification is one-and-done, so must keep re-regesitering indefinitely
|
|
}
|
|
while (await ReconnectOnDependencyException(stoppingToken));
|
|
|
|
_logger?.LogWarning("Giving up listening for notifications on query '{query}' because reconnection failed. Configuration updates from database will no longer occur", settingsQuery);
|
|
SqlDependency.Stop(_options.ConnectionString);
|
|
}
|
|
|
|
|
|
private async Task<string> Initialise(CancellationToken stoppingToken)
|
|
{
|
|
var openTask = _dbContext.Database.OpenConnectionAsync(stoppingToken);
|
|
|
|
try
|
|
{
|
|
SqlDependency.Start(_options.ConnectionString);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger?.LogError(e, "Exception while attempting to start dependency change listener");
|
|
throw;
|
|
}
|
|
|
|
var settingsQuery = _dbContext.Settings.ToQueryString();
|
|
|
|
_logger?.LogInformation("Listening for notifications on query '{query}'", settingsQuery);
|
|
|
|
await openTask;
|
|
|
|
return settingsQuery;
|
|
}
|
|
|
|
|
|
private async Task<(TaskCompletionSource, CancellationTokenRegistration)> ListenForNotifications(string settingsQuery, CancellationToken stoppingToken)
|
|
{
|
|
using var command = new SqlCommand()
|
|
{
|
|
Connection = (SqlConnection)_dbContext.Database.GetDbConnection(),
|
|
CommandType = CommandType.Text,
|
|
CommandText = settingsQuery,
|
|
};
|
|
|
|
var dependency = new SqlDependency(command);
|
|
var tcs = new TaskCompletionSource();
|
|
dependency.OnChange += OnChange;
|
|
var tcsCancelRegistration = stoppingToken.Register(() => tcs.TrySetCanceled());
|
|
|
|
try
|
|
{
|
|
await command.ExecuteNonQueryAsync(stoppingToken);
|
|
}
|
|
catch (Exception e) when (e is not TaskCanceledException)
|
|
{
|
|
_logger?.LogError(e, "Exception while attempting to register query dependency");
|
|
throw;
|
|
}
|
|
|
|
return (tcs, tcsCancelRegistration);
|
|
|
|
void OnChange(object sender, SqlNotificationEventArgs args)
|
|
{
|
|
switch (args.Info)
|
|
{
|
|
case SqlNotificationInfo.Insert:
|
|
case SqlNotificationInfo.Update:
|
|
case SqlNotificationInfo.Delete:
|
|
case SqlNotificationInfo.Expired:
|
|
case SqlNotificationInfo.Truncate:
|
|
tcs.TrySetResult();
|
|
return;
|
|
case SqlNotificationInfo.Error:
|
|
_logger?.LogWarning("SqlDependency '{info}' from {type}@{source}", args.Info, args.Type, args.Source);
|
|
tcs.TrySetException(new Exception($"SqlDependency {args.Info} from {args.Type}@{args.Source}"));
|
|
return;
|
|
default:
|
|
_logger?.LogWarning("Ignoring '{info}' from {type}@{source} received from SqlDependency", args.Info, args.Type, args.Source);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
private async Task<long> GetChangeTrackingVersion(CancellationToken stoppingToken)
|
|
{
|
|
var query = "SELECT CHANGE_TRACKING_CURRENT_VERSION() as value";
|
|
return await _dbContext.Database.SqlQueryRaw<long>(query).FirstOrDefaultAsync(stoppingToken);
|
|
}
|
|
|
|
|
|
private void UpdateConfiguration(long lastVersion)
|
|
{
|
|
var changed = false;
|
|
foreach (var (key, value) in GetChanges(lastVersion))
|
|
{
|
|
if (value is null)
|
|
{
|
|
changed = true;
|
|
_configProvider.Remove(key);
|
|
}
|
|
else if (!_configProvider.TryGet(key, out var oldValue) || !string.Equals(oldValue, value))
|
|
{
|
|
changed = true;
|
|
_configProvider.Set(key, value);
|
|
}
|
|
}
|
|
|
|
if (changed) _configProvider.OnReload();
|
|
}
|
|
|
|
|
|
private IQueryable<(string key, string? value)> GetChanges(long lastVersion)
|
|
{
|
|
_changesQueryTemplate ??= GetQueryTemplate();
|
|
|
|
return _dbContext.Database.SqlQueryRaw<TSetting>(_changesQueryTemplate, lastVersion)
|
|
.Select(s => ValueTuple.Create(s.Key, string.Equals(s.Value, null) ? null : s.Value)); // hack to get EF to not throw if Value is null
|
|
}
|
|
|
|
|
|
private string GetQueryTemplate()
|
|
{
|
|
var entityType = _dbContext.Model.FindEntityType(typeof(TSetting))!;
|
|
var schemaQualifiedTableName = $"[{entityType.GetSchema()}].[{entityType.GetTableName()}]";
|
|
var keyColumn = $"[{entityType.FindPrimaryKey()!.Properties.Single().GetColumnName()}]";
|
|
var valueColumn = $"[{entityType.FindProperty(nameof(ISetting.Value))!.GetColumnName()}]";
|
|
|
|
var changesQueryTemplate = $$"""
|
|
SELECT ct.{{keyColumn}}, s.{{valueColumn}}
|
|
FROM CHANGETABLE( CHANGES {{schemaQualifiedTableName}}, {0} ) AS ct
|
|
LEFT JOIN {{schemaQualifiedTableName}} AS s ON ct.{{keyColumn}} = s.{{keyColumn}}
|
|
""";
|
|
return changesQueryTemplate;
|
|
}
|
|
|
|
|
|
private async Task<bool> ReconnectOnDependencyException(CancellationToken stoppingToken)
|
|
{
|
|
// explicitly close connection to force subsequent OpenConnectionAsync() to actually try to open the connection instead of it possibly being a no-op
|
|
await _dbContext.Database.CloseConnectionAsync();
|
|
try
|
|
{
|
|
await _dbContext.Database.OpenConnectionAsync(stoppingToken);
|
|
return true;
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
_logger?.LogWarning(e, "Exception while attempting to reconnect to database. Configuration updates from database will no longer occur");
|
|
}
|
|
return false;
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
Query here is throwing exception for some reason. Think it is getting confused about key column being aliased as value perhaps?
|
|
From the logs:
|
|
warn: RelationalEventId.QueryPossibleUnintendedUseOfEqualsWarning[20501] (Microsoft.EntityFrameworkCore.Query)
|
|
Possible unintended use of method 'Equals' for arguments 's.Value' and 's0.Key' of different types in a query.This comparison will always return false.
|
|
...
|
|
Generated query execution expression:
|
|
'queryContext => SingleQueryingEnumerable.Create<ValueTuple<string, string>>(
|
|
relationalQueryContext: (RelationalQueryContext)queryContext,
|
|
relationalCommandResolver: parameters => [LIFTABLE Constant: RelationalCommandCache.QueryExpression(
|
|
Client Projections:
|
|
0 -> 0
|
|
1 -> 1
|
|
SELECT s.Value, CASE
|
|
WHEN s0.Value == NULL THEN NULL
|
|
ELSE s0.Value
|
|
END
|
|
FROM SELECT [Key] AS Value FROM CHANGETABLE( CHANGES [dbo].[Settings], {0} )
|
|
LEFT JOIN dbo.Settings AS s0 ON CAST(0 AS bit)) | Resolver: c => new RelationalCommandCache(
|
|
|
|
|
|
What's up with that JOIN .. ON clause?!
|
|
|
|
*/
|
|
private IQueryable<(string key, string? value)> GetChangesNotWorking(long lastVersion)
|
|
{
|
|
var entityType = _dbContext.Model.FindEntityType(typeof(TSetting))!;
|
|
var schemaQualifiedTableName = $"[{entityType.GetSchema()}].[{entityType.GetTableName()}]";
|
|
var keyColumn = $"[{entityType.FindPrimaryKey()!.Properties.Single().GetColumnName()}]";
|
|
|
|
var changesQueryTemplate = $"SELECT {keyColumn} AS Value FROM CHANGETABLE( CHANGES {schemaQualifiedTableName}, {{0}} )";
|
|
|
|
var query = from changedKey in _dbContext.Database.SqlQueryRaw<string>(changesQueryTemplate, lastVersion)
|
|
join s in _dbContext.Settings
|
|
on changedKey equals s.Key into joined
|
|
from j in joined.DefaultIfEmpty()
|
|
select ValueTuple.Create(changedKey, string.Equals(j.Value, null) ? null : j.Value);
|
|
return query;
|
|
}
|
|
|
|
|
|
public override void Dispose()
|
|
{
|
|
base.Dispose();
|
|
_onChangeHandler?.Dispose();
|
|
}
|
|
} |