You've already forked Extensions.Configuration.EntityFrameworkCore
Working implementation
This commit is contained in:
@@ -0,0 +1,125 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Options = RAIC.Extensions.Configuration.EntityFrameworkCore.PostgreSQL.PostgreSQLNotificationConfigurationReloaderOptions;
|
||||
|
||||
namespace RAIC.Extensions.Configuration.EntityFrameworkCore.PostgreSQL.Extensions;
|
||||
|
||||
public static class ServiceCollectionExtensions
|
||||
{
|
||||
|
||||
public delegate OptionsBuilder<T> OptionsTransformer<T>(OptionsBuilder<T> transform) where T : class;
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Adds <see cref="PostgreSQLNotificationConfigurationReloader"/> (a <see cref="Microsoft.Extensions.Hosting.IHostedService"/> implementation)
|
||||
/// and supporting services to the <see cref="IServiceCollection"/>.
|
||||
/// </summary>
|
||||
/// <typeparam name="TDbContext">Type of the <see cref="DbContext"/> to use to obtain the underlying database connection</typeparam>
|
||||
/// <param name="services">The service collection to add the services too</param>
|
||||
/// <returns>The service collection it was called on now with added services</returns>
|
||||
public static IServiceCollection AddPostgreSQLNotificationConfigurationReloadService<TDbContext>(this IServiceCollection services)
|
||||
where TDbContext : DbContext
|
||||
{
|
||||
return services.AddPostgreSQLNotificationConfigurationReloadService<TDbContext>(static _ => { });
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds <see cref="PostgreSQLNotificationConfigurationReloader"/> (a <see cref="Microsoft.Extensions.Hosting.IHostedService"/> implementation)
|
||||
/// and supporting services to the <see cref="IServiceCollection"/>.
|
||||
/// </summary>
|
||||
/// <typeparam name="TDbContext">Type of the <see cref="DbContext"/> to use to obtain the underlying database connection</typeparam>
|
||||
/// <param name="services">The service collection to add the services too</param>
|
||||
/// <param name="configure">
|
||||
/// Action to manually configure the <see cref="Options"/> instance consumed by <see cref="PostgreSQLNotificationConfigurationReloader"/> eg.
|
||||
/// <code>
|
||||
/// options => {
|
||||
/// options.DebounceInterval = TimeSpan.FromSeconds(2);
|
||||
/// options.ChannelName = "awesome_channel";
|
||||
/// }
|
||||
/// </code>
|
||||
/// </param>
|
||||
/// <returns>The service collection it was called on now with added services</returns>
|
||||
public static IServiceCollection AddPostgreSQLNotificationConfigurationReloadService<TDbContext>(this IServiceCollection services, Action<Options> configure)
|
||||
where TDbContext : DbContext
|
||||
{
|
||||
var optionsBuilder = services.AddCoreServices<TDbContext>();
|
||||
|
||||
optionsBuilder.Configure(configure);
|
||||
|
||||
return services;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds <see cref="PostgreSQLNotificationConfigurationReloader"/> (a <see cref="Microsoft.Extensions.Hosting.IHostedService"/> implementation)
|
||||
/// and supporting services to the <see cref="IServiceCollection"/>.
|
||||
/// </summary>
|
||||
/// <typeparam name="TDbContext">Type of the <see cref="DbContext"/> to use to obtain the underlying database connection</typeparam>
|
||||
/// <param name="services">The service collection to add the services too</param>
|
||||
/// <param name="optionsTransform">
|
||||
/// Transformer delegate to apply to the <see cref="Options"/> instance consumed by <see cref="PostgreSQLNotificationConfigurationReloader"/> eg.
|
||||
/// <code>
|
||||
/// optionsBuilder => optionsBuilder.Bind(context.Configuration)
|
||||
/// </code>
|
||||
/// </param>
|
||||
/// <returns>The service collection it was called on now with added services</returns>
|
||||
public static IServiceCollection AddPostgreSQLNotificationConfigurationReloadService<TDbContext>(this IServiceCollection services, OptionsTransformer<Options> optionsTransform)
|
||||
where TDbContext : DbContext
|
||||
{
|
||||
var optionsBuilder = services.AddCoreServices<TDbContext>();
|
||||
|
||||
optionsTransform(optionsBuilder);
|
||||
|
||||
return services;
|
||||
}
|
||||
|
||||
|
||||
private static OptionsBuilder<Options> AddCoreServices<TDbContext>(this IServiceCollection services)
|
||||
where TDbContext : DbContext
|
||||
{
|
||||
services.AddSingleton(static provider => provider.GetRequiredService<TDbContext>().Database)
|
||||
.AddSingleton(static provider =>
|
||||
{
|
||||
var configRoot = (IConfigurationRoot)provider.GetRequiredService<IConfiguration>(); // DEBT: Is this cast always safe?
|
||||
return configRoot.Providers.OfType<IEntityFrameworkCoreDbSetConfigurationProvider>().Single();
|
||||
})
|
||||
.AddHostedService<PostgreSQLNotificationConfigurationReloader>();
|
||||
|
||||
return services.AddOptions<Options>().ValidateDataAnnotations().ValidateOnStart();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
*
|
||||
CREATE OR REPLACE FUNCTION settings_poc.notify_setting_change()
|
||||
RETURNS trigger AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify('setting_channel', concat(NEW.key,'=',NEW.value));
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION settings_poc.notify_setting_remove()
|
||||
RETURNS trigger AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify('setting_channel', OLD.key);
|
||||
RETURN OLD;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER setting_insert_trigger
|
||||
AFTER INSERT ON settings_poc.settings
|
||||
FOR EACH ROW EXECUTE FUNCTION settings_poc.notify_setting_change();
|
||||
|
||||
CREATE TRIGGER setting_update_trigger
|
||||
AFTER UPDATE ON settings_poc.settings
|
||||
FOR EACH ROW WHEN (NEW.value <> OLD.value) EXECUTE FUNCTION settings_poc.notify_setting_change();
|
||||
|
||||
CREATE TRIGGER setting_delete_trigger
|
||||
AFTER DELETE ON settings_poc.settings
|
||||
FOR EACH ROW EXECUTE FUNCTION settings_poc.notify_setting_remove();
|
||||
|
||||
*
|
||||
*/
|
||||
@@ -0,0 +1,187 @@
|
||||
using System.ComponentModel.DataAnnotations;
|
||||
using System.Data;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Infrastructure;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Options = RAIC.Extensions.Configuration.EntityFrameworkCore.PostgreSQL.PostgreSQLNotificationConfigurationReloaderOptions;
|
||||
|
||||
namespace RAIC.Extensions.Configuration.EntityFrameworkCore.PostgreSQL;
|
||||
|
||||
public class PostgreSQLNotificationConfigurationReloaderOptions // must be public because it appears in public method signatures (various ServiceCollectionExtension methods)
|
||||
{
|
||||
private const string DefaultChannelName = "settings_channel";
|
||||
|
||||
[Required]
|
||||
public TimeSpan DebounceInterval { get; set; } = TimeSpan.FromSeconds(1);
|
||||
|
||||
[Required]
|
||||
public string ChannelName { get; set; } = DefaultChannelName;
|
||||
|
||||
[Required]
|
||||
public byte MaxReconnectionAttempts { get; set; } = 5;
|
||||
|
||||
[Required]
|
||||
public TimeSpan InitialReconnectionDelay { get; set; } = TimeSpan.FromSeconds(1);
|
||||
}
|
||||
|
||||
|
||||
|
||||
internal class PostgreSQLNotificationConfigurationReloader : Microsoft.Extensions.Hosting.BackgroundService
|
||||
{
|
||||
private readonly DatabaseFacade _db;
|
||||
private readonly IEntityFrameworkCoreDbSetConfigurationProvider _configProvider;
|
||||
private readonly IDisposable? _onChangeHandler;
|
||||
private readonly ILogger? _logger;
|
||||
|
||||
private CancellationTokenSource _cts = new();
|
||||
private Options _options;
|
||||
|
||||
|
||||
public PostgreSQLNotificationConfigurationReloader(DatabaseFacade dbFacade, IEntityFrameworkCoreDbSetConfigurationProvider configProvider, IOptionsMonitor<Options> options, ILogger<PostgreSQLNotificationConfigurationReloader>? logger = null)
|
||||
{
|
||||
_db = dbFacade;
|
||||
_configProvider = configProvider;
|
||||
_options = options.CurrentValue;
|
||||
_onChangeHandler = options.OnChange(opt => _options = opt);
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
var dbConnection = Initialise();
|
||||
|
||||
do
|
||||
{
|
||||
await ListenForNotifications(stoppingToken);
|
||||
|
||||
try
|
||||
{
|
||||
await dbConnection.WaitAsync(stoppingToken);
|
||||
}
|
||||
catch (Exception e) when (e is not TaskCanceledException)
|
||||
{
|
||||
_logger?.LogWarning(e, "Exception while waiting for notifications on channel '{channel}'", _options.ChannelName);
|
||||
}
|
||||
}
|
||||
while (await IsReconnectionPossible(stoppingToken));
|
||||
|
||||
_logger?.LogWarning("Giving up listening for notifications on channel '{channel}' because reconnection attempts exhausted. Configuration updates from database will no longer occur", _options.ChannelName);
|
||||
}
|
||||
|
||||
private Npgsql.NpgsqlConnection Initialise()
|
||||
{
|
||||
var dbConnection = (Npgsql.NpgsqlConnection)_db.GetDbConnection();
|
||||
dbConnection.Notification += OnNotification;
|
||||
|
||||
return dbConnection;
|
||||
}
|
||||
|
||||
private async Task ListenForNotifications(CancellationToken stoppingToken)
|
||||
{
|
||||
var listenStatement = $"LISTEN {_options.ChannelName}";
|
||||
await _db.OpenConnectionAsync(stoppingToken);
|
||||
await _db.ExecuteSqlRawAsync(listenStatement, stoppingToken);
|
||||
_logger?.LogInformation("Listening for notifications on channel '{channel}'", _options.ChannelName);
|
||||
}
|
||||
|
||||
private void OnNotification(object sender, Npgsql.NpgsqlNotificationEventArgs args)
|
||||
{
|
||||
_logger?.LogTrace("Received notification '{payload}' on channel '{channel}'", args.Payload, args.Channel);
|
||||
if (args.Channel != _options.ChannelName)
|
||||
return;
|
||||
|
||||
using (var previousCts = Interlocked.Exchange(ref _cts, new()))
|
||||
{
|
||||
previousCts.Cancel();
|
||||
}
|
||||
|
||||
ReadOnlySpan<string> keyValue = args.Payload.Split('=', 2); // DEBT: Do split without heap allocation
|
||||
switch (keyValue.Length)
|
||||
{
|
||||
case 2:
|
||||
_configProvider.Set(keyValue[0], keyValue[1]);
|
||||
break;
|
||||
case 1:
|
||||
_configProvider.Remove(keyValue[0]);
|
||||
break;
|
||||
default:
|
||||
_logger?.LogWarning("Invalid '{channel}' payload '{payload}'", args.Channel, args.Payload);
|
||||
return; // Don't proceed to debounced reload
|
||||
}
|
||||
|
||||
var _ = DebouncedProviderReload(); // Intentionally running without awaiting, may be cancelled
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Trigger reload of the <see cref="IEntityFrameworkCoreDbSetConfigurationProvider"/> after waiting for a [cancelable] debounce period
|
||||
/// </summary>
|
||||
/// <remarks>Does not log <see cref="TaskCanceledException "/> each time debounce task is cancelled, unlike <see cref="SimpleDebouncedProviderReload"/></remarks>
|
||||
private async ValueTask DebouncedProviderReload()
|
||||
{
|
||||
var delayTask = Task.Delay(_options.DebounceInterval);
|
||||
var cancelableTask = Task.Delay(Timeout.Infinite, _cts.Token);
|
||||
var completedTask = await Task.WhenAny(delayTask, cancelableTask).ConfigureAwait(false);
|
||||
|
||||
if (completedTask == delayTask) // If the completed task is the delayTask, we reached debounce delay, proceed with reload
|
||||
{
|
||||
_configProvider.OnReload();
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger?.LogTrace("Debounce operation canceled due to new notification");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Trigger reload of the <see cref="IEntityFrameworkCoreDbSetConfigurationProvider"/> after waiting for a [cancelable] debounce period
|
||||
/// </summary>
|
||||
/// <remarks>Logs <see cref="TaskCanceledException "/> each time debounce task is cancelled</remarks>
|
||||
private async ValueTask SimpleDebouncedProviderReload()
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(_options.DebounceInterval, _cts.Token);
|
||||
_configProvider.OnReload();
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
_logger?.LogTrace("Debounce operation canceled due to new notification");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private async Task<bool> IsReconnectionPossible(CancellationToken stoppingToken)
|
||||
{
|
||||
await Task.Delay(_options.InitialReconnectionDelay, stoppingToken);
|
||||
|
||||
for (var i = 1; i <= _options.MaxReconnectionAttempts; ++i)
|
||||
{
|
||||
_logger?.LogInformation("Reconnection check #{i}", i);
|
||||
var canConnectTask = _db.CanConnectAsync(stoppingToken);
|
||||
var backoffTask = Task.Delay(_options.InitialReconnectionDelay * (1 << i), stoppingToken);
|
||||
var completedTask = await Task.WhenAny(backoffTask, canConnectTask);
|
||||
|
||||
if (completedTask == canConnectTask) // connect finished first
|
||||
{
|
||||
if (await canConnectTask) return true; // if can connect return immediately
|
||||
await backoffTask; // connect failed, wait for backoff time before trying again
|
||||
}
|
||||
else // backoff finished first, wait for connection task result before decided whether to go around again or return immediately
|
||||
{
|
||||
if (await canConnectTask) return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
base.Dispose();
|
||||
_cts.Cancel(); // following implementation in base class which doesn't Dispose() its _stoppingCts, just calls Cancel()
|
||||
_onChangeHandler?.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="10.0.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.0" />
|
||||
<PackageReference Include="Npgsql" Version="10.0.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\RAIC.Extensions.Configuration.EntityFrameworkCore\RAIC.Extensions.Configuration.EntityFrameworkCore.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -0,0 +1,110 @@
|
||||
# RAIC.Extensions.Configuration.EntityFrameworkCore.PostgreSQL
|
||||
|
||||
This library enhances `RAIC.Extensions.Configuration.EntityFrameworkCore` with support for reload on update via PostgreSQL's `LISTEN`/`NOTIFY`
|
||||
[capabilities](https://www.postgresql.org/docs/current/sql-listen.html) as exposed via [npgsql](https://www.npgsql.org/doc/wait.html).
|
||||
|
||||
## Goals
|
||||
1. No polling!
|
||||
1. Updates happen in background via worker service (`IHostedService`)
|
||||
1. Only update settings which change rather than reloading all of them
|
||||
|
||||
## Requirements
|
||||
* .NET 8
|
||||
|
||||
## Gotchas
|
||||
* Setting values cannot be `null` (as signified by the `RequiredAttribute` on `ISetting.Value`)
|
||||
* Setting keys must not contain the `=` character (similar to `CommandLineConfigurationProvider` & `EnvironmentVariablesConfigurationProvider`)
|
||||
* Small window of opportunity for updates to be missed during reconnection process
|
||||
* Consider adding `Keepalive` to your conenction string (https://www.npgsql.org/doc/keepalive.html) if its not already present
|
||||
|
||||
## Known Issues
|
||||
* Not tested under load
|
||||
|
||||
## Configuration Options
|
||||
There are four properties which can be configured (cf. the `PostgreSQLNotificationConfigurationBackgroundServiceOptions` POCO)
|
||||
1. `ChannelName` - the name of the notification channel to listen to for updates
|
||||
1. `DebounceInterval` - the time interval to wait after a notification before signaling the `IEntityFrameworkCoreDbSetConfigurationProvider` to reload.
|
||||
1. `MaxReconnectionAttempts` - Maximum number of times to retry connecting after a connection dropout (eg. PostgreSQL restart, network connectivity issue)
|
||||
1. `InitialReconnectionDelay` - How long to wait after connection dropout before attempting to reconnection.
|
||||
|
||||
## Setup
|
||||
For `PostgreSQLNotificationConfigurationReloader` to work it requires triggers on the `Settings` table to be established which send notifications to the channel
|
||||
corresponding to that specified by the `ChannelName` option (defaults to `settings_channel`). SQL similar to below is one (but not only) way this can be acheived:
|
||||
|
||||
```sql
|
||||
CREATE OR REPLACE FUNCTION notify_setting_change()
|
||||
RETURNS trigger AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify('settings_channel', concat(NEW.key,'=',NEW.value));
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION notify_setting_remove()
|
||||
RETURNS trigger AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify('settings_channel', OLD.key);
|
||||
RETURN OLD;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER setting_insert_trigger
|
||||
AFTER INSERT ON settings
|
||||
FOR EACH ROW EXECUTE FUNCTION notify_setting_change();
|
||||
|
||||
CREATE TRIGGER setting_update_trigger
|
||||
AFTER UPDATE ON settings
|
||||
FOR EACH ROW WHEN (NEW.value <> OLD.value) EXECUTE FUNCTION notify_setting_change();
|
||||
|
||||
CREATE TRIGGER setting_key_update_trigger
|
||||
AFTER UPDATE ON settings
|
||||
FOR EACH ROW WHEN (NEW.key <> OLD.key) EXECUTE FUNCTION notify_setting_remove();
|
||||
|
||||
CREATE TRIGGER setting_delete_trigger
|
||||
AFTER DELETE ON settings
|
||||
FOR EACH ROW EXECUTE FUNCTION notify_setting_remove();
|
||||
```
|
||||
|
||||
Reccommend adding your SQL to the migration which adds the `Settings` table/view (or a new migration if that table/view already exists).
|
||||
|
||||
## Usage Example
|
||||
|
||||
```csharp
|
||||
|
||||
using RAIC.Extensions.Configuration.EntityFrameworkCore;
|
||||
using RAIC.Extensions.Configuration.EntityFrameworkCore.Extensions;
|
||||
using RAIC.Extensions.Configuration.EntityFrameworkCore.PostgreSQL.Extensions;
|
||||
|
||||
public record Setting : ISetting
|
||||
{
|
||||
[Key]
|
||||
public required string Key { get; set; }
|
||||
|
||||
[Required]
|
||||
public required string Value { get; set; }
|
||||
}
|
||||
|
||||
public class MyDbContext(DbContextOptions<MyDbContext> options) : DbContext(options), ISettingsDbContext<DbSet<Setting>, Setting>
|
||||
{
|
||||
public DbSet<Setting> Settings { get; set; }
|
||||
}
|
||||
|
||||
|
||||
var builder = Host.CreateApplicationBuilder(args); // or WebApplication.CreateBuilder(args);
|
||||
|
||||
// build an initial configuration
|
||||
builder.Configuration.AddJsonFile("appsettings.json")
|
||||
...
|
||||
.AddUserSecrets<Program>(); // or wherever your connection string lives
|
||||
|
||||
builder.Configuration.AddDbSet<MyDbContext, Setting>(dbContextOptions => dbContextOptions.UseNpgsql(builder.Configuration.GetConnectionString("Default")));
|
||||
|
||||
...
|
||||
// Add the PostgreSQLNotificationConfigurationReloader background service and supporting services to obtain setting reloading functionalty
|
||||
builder.Services.AddPostgreSQLNotificationConfigurationReloadService<SettingsDbContext>(); // uses default settings, other overrides exist - see code docs
|
||||
|
||||
await builder.Build().RunAsync(); // use config as normal
|
||||
|
||||
```
|
||||
|
||||
Read more about [Configuration](https://docs.microsoft.com/en-us/dotnet/core/extensions/configuration) and [Options](https://docs.microsoft.com/en-us/dotnet/core/extensions/options) on the Microsoft Docs site.
|
||||
Reference in New Issue
Block a user