You've already forked Extensions.Configuration.EntityFrameworkCore
200 lines
7.5 KiB
C#
200 lines
7.5 KiB
C#
using System.ComponentModel.DataAnnotations;
|
|
using System.Data;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.EntityFrameworkCore.Infrastructure;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using Options = RAIC.Extensions.Configuration.EntityFrameworkCore.PostgreSQL.PostgreSQLNotificationConfigurationReloaderOptions;
|
|
|
|
namespace RAIC.Extensions.Configuration.EntityFrameworkCore.PostgreSQL;
|
|
|
|
public class PostgreSQLNotificationConfigurationReloaderOptions // must be public because it appears in public method signatures (various ServiceCollectionExtension methods)
|
|
{
|
|
private const string DefaultChannelName = "settings_channel";
|
|
|
|
[Required]
|
|
public TimeSpan DebounceInterval { get; set; } = TimeSpan.FromSeconds(1);
|
|
|
|
[Required]
|
|
public string ChannelName { get; set; } = DefaultChannelName;
|
|
|
|
[Required]
|
|
public byte MaxReconnectionAttempts { get; set; } = 5;
|
|
|
|
[Required]
|
|
public TimeSpan InitialReconnectionDelay { get; set; } = TimeSpan.FromSeconds(1);
|
|
}
|
|
|
|
|
|
|
|
internal class PostgreSQLNotificationConfigurationReloader : Microsoft.Extensions.Hosting.BackgroundService
|
|
{
|
|
private readonly DatabaseFacade _db;
|
|
private readonly IEntityFrameworkCoreDbSetConfigurationProvider _configProvider;
|
|
private readonly IDisposable? _onChangeHandler;
|
|
private readonly ILogger? _logger;
|
|
|
|
private CancellationTokenSource _cts = new();
|
|
private Options _options;
|
|
|
|
|
|
public PostgreSQLNotificationConfigurationReloader(DatabaseFacade dbFacade, IEntityFrameworkCoreDbSetConfigurationProvider configProvider, IOptionsMonitor<Options> options, ILogger<PostgreSQLNotificationConfigurationReloader>? logger = null)
|
|
{
|
|
_db = dbFacade;
|
|
_configProvider = configProvider;
|
|
_options = options.CurrentValue;
|
|
_onChangeHandler = options.OnChange(opt => _options = opt);
|
|
_logger = logger;
|
|
}
|
|
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
var dbConnection = Initialise();
|
|
|
|
try
|
|
{
|
|
await ListenForNotifications(stoppingToken);
|
|
|
|
do
|
|
{
|
|
try
|
|
{
|
|
await dbConnection.WaitAsync(stoppingToken);
|
|
}
|
|
catch (Exception e) when (e is not OperationCanceledException)
|
|
{
|
|
_logger?.LogWarning(e, "Exception while waiting for notifications on channel '{channel}'", _options.ChannelName);
|
|
}
|
|
}
|
|
while (ConnectionState.Open == dbConnection.State || await TryReconnect(stoppingToken));
|
|
|
|
_logger?.LogWarning("Giving up listening for notifications on channel '{channel}' because reconnection attempts exhausted. Configuration updates from database will no longer occur", _options.ChannelName);
|
|
}
|
|
catch (OperationCanceledException e)
|
|
{
|
|
_logger?.LogInformation(e, "Exiting due to signal from stopping token");
|
|
}
|
|
}
|
|
|
|
private Npgsql.NpgsqlConnection Initialise()
|
|
{
|
|
var dbConnection = (Npgsql.NpgsqlConnection)_db.GetDbConnection();
|
|
dbConnection.Notification += OnNotification;
|
|
|
|
return dbConnection;
|
|
|
|
void OnNotification(object sender, Npgsql.NpgsqlNotificationEventArgs args)
|
|
{
|
|
_logger?.LogTrace("Received notification '{payload}' on channel '{channel}'", args.Payload, args.Channel);
|
|
if (args.Channel != _options.ChannelName)
|
|
return;
|
|
|
|
using (var previousCts = Interlocked.Exchange(ref _cts, new()))
|
|
{
|
|
previousCts.Cancel();
|
|
}
|
|
|
|
ReadOnlySpan<string> keyValue = args.Payload.Split('=', 2); // DEBT: Do split without heap allocation
|
|
switch (keyValue.Length)
|
|
{
|
|
case 2:
|
|
_configProvider.Set(keyValue[0], keyValue[1]);
|
|
break;
|
|
case 1:
|
|
_configProvider.Remove(keyValue[0]);
|
|
break;
|
|
default:
|
|
_logger?.LogWarning("Invalid '{channel}' payload '{payload}'", args.Channel, args.Payload);
|
|
return; // Don't proceed to debounced reload
|
|
}
|
|
|
|
_ = DebouncedProviderReload(); // Intentionally running without awaiting, may be cancelled
|
|
}
|
|
}
|
|
|
|
|
|
private async Task ListenForNotifications(CancellationToken stoppingToken)
|
|
{
|
|
var listenStatement = $"LISTEN {_options.ChannelName}";
|
|
await _db.OpenConnectionAsync(stoppingToken);
|
|
await _db.ExecuteSqlRawAsync(listenStatement, stoppingToken);
|
|
_logger?.LogInformation("Listening for notifications on channel '{channel}'", _options.ChannelName);
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Trigger reload of the <see cref="IEntityFrameworkCoreDbSetConfigurationProvider"/> after waiting for a [cancelable] debounce period
|
|
/// </summary>
|
|
/// <remarks>Does not log <see cref="TaskCanceledException "/> each time debounce task is cancelled, unlike <see cref="SimpleDebouncedProviderReload"/></remarks>
|
|
private async Task DebouncedProviderReload()
|
|
{
|
|
var delayTask = Task.Delay(_options.DebounceInterval);
|
|
var cancelableTask = Task.Delay(Timeout.Infinite, _cts.Token);
|
|
var completedTask = await Task.WhenAny(delayTask, cancelableTask).ConfigureAwait(false);
|
|
|
|
if (ReferenceEquals(completedTask, delayTask)) // If the completed task is the delayTask, we reached debounce delay, proceed with reload
|
|
{
|
|
_configProvider.OnReload();
|
|
}
|
|
else
|
|
{
|
|
_logger?.LogTrace("Debounce operation canceled due to new notification");
|
|
}
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Trigger reload of the <see cref="IEntityFrameworkCoreDbSetConfigurationProvider"/> after waiting for a [cancelable] debounce period
|
|
/// </summary>
|
|
/// <remarks>Logs <see cref="TaskCanceledException "/> each time debounce task is cancelled</remarks>
|
|
private async ValueTask SimpleDebouncedProviderReload()
|
|
{
|
|
try
|
|
{
|
|
await Task.Delay(_options.DebounceInterval, _cts.Token);
|
|
_configProvider.OnReload();
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
_logger?.LogTrace("Debounce operation canceled due to new notification");
|
|
}
|
|
}
|
|
|
|
|
|
private async Task<bool> TryReconnect(CancellationToken stoppingToken)
|
|
{
|
|
await Task.Delay(_options.InitialReconnectionDelay, stoppingToken);
|
|
|
|
for (var i = 1; i <= _options.MaxReconnectionAttempts; ++i)
|
|
{
|
|
_logger?.LogInformation("Reconnection check #{i}", i);
|
|
var canConnectTask = _db.CanConnectAsync(stoppingToken);
|
|
var backoffTask = Task.Delay(_options.InitialReconnectionDelay * (1 << i), stoppingToken);
|
|
var completedTask = await Task.WhenAny(backoffTask, canConnectTask);
|
|
|
|
if (ReferenceEquals(completedTask, backoffTask)) // backoff finished first
|
|
{
|
|
continue; // Assume connection task has failed
|
|
}
|
|
|
|
// can connect task finished first
|
|
if (await canConnectTask) // if it finished with success
|
|
{
|
|
await ListenForNotifications(stoppingToken);
|
|
return true;
|
|
}
|
|
|
|
await backoffTask; // connect failed, wait for backoff time before trying again
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
public override void Dispose()
|
|
{
|
|
base.Dispose();
|
|
_cts.Cancel(); // following implementation in base class which doesn't Dispose() its _stoppingCts, just calls Cancel()
|
|
_onChangeHandler?.Dispose();
|
|
}
|
|
} |