using System.ComponentModel.DataAnnotations; using System.Data; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Options = RAIC.Extensions.Configuration.EntityFrameworkCore.PostgreSQL.PostgreSQLNotificationConfigurationReloaderOptions; namespace RAIC.Extensions.Configuration.EntityFrameworkCore.PostgreSQL; public class PostgreSQLNotificationConfigurationReloaderOptions // must be public because it appears in public method signatures (various ServiceCollectionExtension methods) { private const string DefaultChannelName = "settings_channel"; [Required] public TimeSpan DebounceInterval { get; set; } = TimeSpan.FromSeconds(1); [Required] public string ChannelName { get; set; } = DefaultChannelName; [Required] public byte MaxReconnectionAttempts { get; set; } = 5; [Required] public TimeSpan InitialReconnectionDelay { get; set; } = TimeSpan.FromSeconds(1); } internal class PostgreSQLNotificationConfigurationReloader : Microsoft.Extensions.Hosting.BackgroundService { private readonly DatabaseFacade _db; private readonly IEntityFrameworkCoreDbSetConfigurationProvider _configProvider; private readonly IDisposable? _onChangeHandler; private readonly ILogger? _logger; private CancellationTokenSource _cts = new(); private Options _options; public PostgreSQLNotificationConfigurationReloader(DatabaseFacade dbFacade, IEntityFrameworkCoreDbSetConfigurationProvider configProvider, IOptionsMonitor options, ILogger? logger = null) { _db = dbFacade; _configProvider = configProvider; _options = options.CurrentValue; _onChangeHandler = options.OnChange(opt => _options = opt); _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var dbConnection = Initialise(); do { await ListenForNotifications(stoppingToken); try { await dbConnection.WaitAsync(stoppingToken); } catch (Exception e) when (e is not TaskCanceledException) { _logger?.LogWarning(e, "Exception while waiting for notifications on channel '{channel}'", _options.ChannelName); } } while (ConnectionState.Open == dbConnection.State || await IsReconnectionPossible(stoppingToken)); _logger?.LogWarning("Giving up listening for notifications on channel '{channel}' because reconnection attempts exhausted. Configuration updates from database will no longer occur", _options.ChannelName); } private Npgsql.NpgsqlConnection Initialise() { var dbConnection = (Npgsql.NpgsqlConnection)_db.GetDbConnection(); dbConnection.Notification += OnNotification; return dbConnection; } private async Task ListenForNotifications(CancellationToken stoppingToken) { var listenStatement = $"LISTEN {_options.ChannelName}"; await _db.OpenConnectionAsync(stoppingToken); await _db.ExecuteSqlRawAsync(listenStatement, stoppingToken); _logger?.LogInformation("Listening for notifications on channel '{channel}'", _options.ChannelName); } private void OnNotification(object sender, Npgsql.NpgsqlNotificationEventArgs args) { _logger?.LogTrace("Received notification '{payload}' on channel '{channel}'", args.Payload, args.Channel); if (args.Channel != _options.ChannelName) return; using (var previousCts = Interlocked.Exchange(ref _cts, new())) { previousCts.Cancel(); } ReadOnlySpan keyValue = args.Payload.Split('=', 2); // DEBT: Do split without heap allocation switch (keyValue.Length) { case 2: _configProvider.Set(keyValue[0], keyValue[1]); break; case 1: _configProvider.Remove(keyValue[0]); break; default: _logger?.LogWarning("Invalid '{channel}' payload '{payload}'", args.Channel, args.Payload); return; // Don't proceed to debounced reload } var _ = DebouncedProviderReload(); // Intentionally running without awaiting, may be cancelled } /// /// Trigger reload of the after waiting for a [cancelable] debounce period /// /// Does not log each time debounce task is cancelled, unlike private async ValueTask DebouncedProviderReload() { var delayTask = Task.Delay(_options.DebounceInterval); var cancelableTask = Task.Delay(Timeout.Infinite, _cts.Token); var completedTask = await Task.WhenAny(delayTask, cancelableTask).ConfigureAwait(false); if (ReferenceEquals(completedTask, delayTask)) // If the completed task is the delayTask, we reached debounce delay, proceed with reload { _configProvider.OnReload(); } else { _logger?.LogTrace("Debounce operation canceled due to new notification"); } } /// /// Trigger reload of the after waiting for a [cancelable] debounce period /// /// Logs each time debounce task is cancelled private async ValueTask SimpleDebouncedProviderReload() { try { await Task.Delay(_options.DebounceInterval, _cts.Token); _configProvider.OnReload(); } catch (OperationCanceledException) { _logger?.LogTrace("Debounce operation canceled due to new notification"); } } private async Task IsReconnectionPossible(CancellationToken stoppingToken) { await Task.Delay(_options.InitialReconnectionDelay, stoppingToken); for (var i = 1; i <= _options.MaxReconnectionAttempts; ++i) { _logger?.LogInformation("Reconnection check #{i}", i); var canConnectTask = _db.CanConnectAsync(stoppingToken); var backoffTask = Task.Delay(_options.InitialReconnectionDelay * (1 << i), stoppingToken); var completedTask = await Task.WhenAny(backoffTask, canConnectTask); if (ReferenceEquals(completedTask, canConnectTask)) // connect finished first { if (await canConnectTask) return true; // if can connect return immediately await backoffTask; // connect failed, wait for backoff time before trying again } else // backoff finished first, wait for connection task result before decided whether to go around again or return immediately { if (await canConnectTask) return true; } } return false; } public override void Dispose() { base.Dispose(); _cts.Cancel(); // following implementation in base class which doesn't Dispose() its _stoppingCts, just calls Cancel() _onChangeHandler?.Dispose(); } }