Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions src/ServiceControl.Transports.ASBS/ASBSTransportCustomization.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
namespace ServiceControl.Transports.ASBS
{
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using BrokerThroughput;
using Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -71,18 +75,7 @@ protected override void AddTransportForPrimaryCore(IServiceCollection services,
{
TopicToPublishTo = connectionSettings.TopicName,
TopicToSubscribeOn = connectionSettings.TopicName,
EventsToMigrateMap =
[
"ServiceControl.Contracts.CustomCheckFailed",
"ServiceControl.Contracts.CustomCheckSucceeded",
"ServiceControl.Contracts.HeartbeatRestored",
"ServiceControl.Contracts.HeartbeatStopped",
"ServiceControl.Contracts.FailedMessagesArchived",
"ServiceControl.Contracts.FailedMessagesUnArchived",
"ServiceControl.Contracts.MessageFailed",
"ServiceControl.Contracts.MessageFailureResolvedByRetry",
"ServiceControl.Contracts.MessageFailureResolvedManually"
]
EventsToMigrateMap = [.. transportSettings.EventTypesPublished.Select(t => t.FullName)]
});
}
else if (SettingsReader.TryRead<string>(serviceBusRootNamespace, "Topology", out var topologyJson))
Expand All @@ -104,5 +97,47 @@ protected override void AddTransportForMonitoringCore(IServiceCollection service
services.AddSingleton<IProvideQueueLength, QueueLengthProvider>();
services.AddHostedService(provider => provider.GetRequiredService<IProvideQueueLength>());
}

public override async Task ProvisionQueues(TransportSettings transportSettings, IEnumerable<string> additionalQueues)
{
await base.ProvisionQueues(transportSettings, additionalQueues);

if (transportSettings.EventTypesPublished.Count == 0)
{
return;
}

var connectionSettings = ConnectionStringParser.Parse(transportSettings.ConnectionString);

var managementClient = connectionSettings.AuthenticationMethod.BuildManagementClient();

var creationTasks = new List<Task>(transportSettings.EventTypesPublished.Count);
foreach (var publishedTopic in transportSettings.EventTypesPublished)
{
creationTasks.Add(CreateTopic(publishedTopic.FullName));
}
await Task.WhenAll(creationTasks);

async Task CreateTopic(string publishedTopic)
{
var topicToPublishTo = new CreateTopicOptions(connectionSettings.HierarchyNamespace != null
? $"{connectionSettings.HierarchyNamespace}/{publishedTopic}"
: publishedTopic)
{
EnableBatchedOperations = true,
MaxSizeInMegabytes = 5 * 1024, // we are currently not configuring this in the connection string so it uses the same default as the transport
EnablePartitioning = connectionSettings.EnablePartitioning,
};

try
{
await managementClient.CreateTopicAsync(topicToPublishTo).ConfigureAwait(false);
}
catch (ServiceBusException sbe) when (sbe.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists || sbe.IsTransient)
{
// carry on
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ namespace ServiceControl.Transports
public string ConnectionString { get; set; }
public string EndpointName { get; set; }
public string ErrorQueue { get; set; }
public System.Collections.Generic.IReadOnlySet<System.Type> EventTypesPublished { get; init; }
public int? MaxConcurrency { get; set; }
public bool RunCustomChecks { get; set; }
public string TransportType { get; set; }
Expand Down
3 changes: 3 additions & 0 deletions src/ServiceControl.Transports/TransportSettings.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceControl.Transports
{
using System;
using System.Collections.Generic;
using System.Runtime.Loader;
using NServiceBus.Settings;

Expand All @@ -18,6 +19,8 @@ public class TransportSettings : SettingsHolder

public bool RunCustomChecks { get; set; }

public IReadOnlySet<Type> EventTypesPublished { get; init; } = new HashSet<Type>();

public string ErrorQueue
{
set;
Expand Down
12 changes: 9 additions & 3 deletions src/ServiceControl/ComponentInstallationContext.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
namespace Particular.ServiceControl
{
using System;
using System.Collections.Generic;

class ComponentInstallationContext : IComponentInstallationContext
public class ComponentInstallationContext : IComponentInstallationContext
{
public List<string> Queues { get; } = [];
public IReadOnlyCollection<string> Queues => queuesToCreate;
public IReadOnlySet<Type> EventTypesPublished => eventTypePublished;

public void CreateQueue(string queueName) => Queues.Add(queueName);
public void CreateQueue(string queueName) => queuesToCreate.Add(queueName);
public void AddEventPublished<TEvent>() => eventTypePublished.Add(typeof(TEvent));

readonly List<string> queuesToCreate = [];
readonly HashSet<Type> eventTypePublished = [];
}
}
11 changes: 11 additions & 0 deletions src/ServiceControl/CustomChecks/CustomChecksComponent.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceControl.CustomChecks
{
using Connection;
using Contracts;
using ExternalIntegrations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand All @@ -10,6 +11,16 @@

class CustomChecksComponent : ServiceControlComponent
{
public override void Setup(Settings settings, IComponentInstallationContext context, IHostApplicationBuilder hostBuilder)
{
// Integration Events
if (!settings.DisableExternalIntegrationsPublishing)
{
context.AddEventPublished<CustomCheckFailed>();
context.AddEventPublished<CustomCheckSucceeded>();
}
}

public override void Configure(Settings settings, ITransportCustomization transportCustomization, IHostApplicationBuilder hostBuilder)
{
hostBuilder.Services.AddIntegrationEventPublisher<CustomCheckFailedPublisher>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace ServiceControl.ExternalIntegrations
{
using Infrastructure.DomainEvents;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Particular.ServiceControl;
using ServiceBus.Management.Infrastructure.Settings;
Expand All @@ -11,6 +13,12 @@ public override void Configure(Settings settings, ITransportCustomization transp
{
var services = hostBuilder.Services;
services.AddEventLogMapping<ExternalIntegrationEventFailedToBePublishedDefinition>();

if (!settings.DisableExternalIntegrationsPublishing)
{
services.AddHostedService<EventDispatcherHostedService>();
services.AddDomainEventHandler<IntegrationEventWriter>();
}
}
}
}

This file was deleted.

17 changes: 9 additions & 8 deletions src/ServiceControl/HostApplicationBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ namespace Particular.ServiceControl
using System.Diagnostics;
using System.Runtime.InteropServices;
using global::ServiceControl.CustomChecks;
using global::ServiceControl.ExternalIntegrations;
using global::ServiceControl.Hosting;
using global::ServiceControl.Infrastructure;
using global::ServiceControl.Infrastructure.BackgroundTasks;
Expand Down Expand Up @@ -45,8 +44,15 @@ public static void AddServiceControl(this IHostApplicationBuilder hostBuilder, S
hostBuilder.Logging.ClearProviders();
hostBuilder.Logging.ConfigureLogging(settings.LoggingSettings.LogLevel);

var componentSetupContext = new ComponentInstallationContext();
var serviceControlComponents = ServiceControlMainInstance.Components;
foreach (ServiceControlComponent component in serviceControlComponents)
{
component.Setup(settings, componentSetupContext, hostBuilder);
}

var services = hostBuilder.Services;
var transportSettings = settings.ToTransportSettings();
var transportSettings = settings.ToTransportSettings(componentSetupContext);
var transportCustomization = TransportFactory.Create(transportSettings);
transportCustomization.AddTransportForPrimary(services, transportSettings);

Expand Down Expand Up @@ -81,11 +87,6 @@ public static void AddServiceControl(this IHostApplicationBuilder hostBuilder, S
NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, configuration);
hostBuilder.UseNServiceBus(configuration);

if (!settings.DisableExternalIntegrationsPublishing)
{
hostBuilder.AddExternalIntegrationEvents();
}

hostBuilder.AddServicePulseSignalRNotifier();
hostBuilder.AddEmailNotifications();
hostBuilder.AddAsyncTimer();
Expand All @@ -101,7 +102,7 @@ public static void AddServiceControl(this IHostApplicationBuilder hostBuilder, S
hostBuilder.AddWindowsServiceWithRequestTimeout();
}

hostBuilder.AddServiceControlComponents(settings, transportCustomization, ServiceControlMainInstance.Components);
hostBuilder.AddServiceControlComponents(componentSetupContext, settings, transportCustomization, serviceControlComponents);
}

public static void AddServiceControlInstallers(this IHostApplicationBuilder hostApplicationBuilder, Settings settings)
Expand Down
2 changes: 1 addition & 1 deletion src/ServiceControl/Hosting/Commands/SetupCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public override async Task Execute(HostArguments args, Settings settings)
}
else
{
var transportSettings = settings.ToTransportSettings();
var transportSettings = settings.ToTransportSettings(componentSetupContext);
transportSettings.RunCustomChecks = false;
var transportCustomization = TransportFactory.Create(transportSettings);

Expand Down
4 changes: 3 additions & 1 deletion src/ServiceControl/IComponentInstallationContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
namespace Particular.ServiceControl
{
interface IComponentInstallationContext
public interface IComponentInstallationContext
{
void CreateQueue(string queueName);

void AddEventPublished<TEvent>();
}
}
6 changes: 4 additions & 2 deletions src/ServiceControl/Infrastructure/Settings/Settings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace ServiceBus.Management.Infrastructure.Settings
using Microsoft.Extensions.Logging;
using NLog.Common;
using NServiceBus.Transport;
using Particular.ServiceControl;
using ServiceControl.Configuration;
using ServiceControl.Infrastructure;
using ServiceControl.Infrastructure.Settings;
Expand Down Expand Up @@ -238,7 +239,7 @@ public string GetConnectionString()
return connectionStringSettings?.ConnectionString;
}

public TransportSettings ToTransportSettings()
public TransportSettings ToTransportSettings(ComponentInstallationContext installationContext)
{
var transportSettings = new TransportSettings
{
Expand All @@ -247,7 +248,8 @@ public TransportSettings ToTransportSettings()
MaxConcurrency = MaximumConcurrencyLevel,
RunCustomChecks = true,
TransportType = TransportType,
AssemblyLoadContextResolver = AssemblyLoadContextResolver
AssemblyLoadContextResolver = AssemblyLoadContextResolver,
EventTypesPublished = installationContext.EventTypesPublished,
};
return transportSettings;
}
Expand Down
11 changes: 11 additions & 0 deletions src/ServiceControl/Monitoring/HeartbeatMonitoringComponent.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceControl.Monitoring
{
using Connection;
using Contracts;
using EndpointControl.Handlers;
using EventLog;
using ExternalIntegrations;
Expand All @@ -16,6 +17,16 @@

class HeartbeatMonitoringComponent : ServiceControlComponent
{
public override void Setup(Settings settings, IComponentInstallationContext context, IHostApplicationBuilder hostBuilder)
{
// Integration Events
if (!settings.DisableExternalIntegrationsPublishing)
{
context.AddEventPublished<HeartbeatRestored>();
context.AddEventPublished<HeartbeatStopped>();
}
}

public override void Configure(Settings settings, ITransportCustomization transportCustomization, IHostApplicationBuilder hostBuilder)
{
hostBuilder.Services.AddHostedService<HeartbeatMonitoringHostedService>();
Expand Down
47 changes: 32 additions & 15 deletions src/ServiceControl/Recoverability/RecoverabilityComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using System.Threading.Tasks;
using Connection;
using Contracts;
using Contracts.MessageFailures;
using CustomChecks;
using EventLog;
Expand All @@ -20,9 +21,40 @@
using Retrying;
using ServiceBus.Management.Infrastructure.Settings;
using Transports;
using FailedMessagesUnArchived = Contracts.FailedMessagesUnArchived;
using MessageEditedAndRetried = Contracts.MessageEditedAndRetried;
using MessageFailed = Contracts.MessageFailed;
using MessageFailureResolvedByRetry = Contracts.MessageFailureResolvedByRetry;
using MessageFailureResolvedManually = Contracts.MessageFailureResolvedManually;

class RecoverabilityComponent : ServiceControlComponent
{
public override void Setup(Settings settings, IComponentInstallationContext context, IHostApplicationBuilder hostBuilder)
{
context.CreateQueue(settings.StagingQueue);

if (settings.IngestErrorMessages)
{
context.CreateQueue(settings.ErrorQueue);
}

if (settings.ForwardErrorMessages && settings.ErrorLogQueue != null)
{
context.CreateQueue(settings.ErrorLogQueue);
}

// Integration Events
if (!settings.DisableExternalIntegrationsPublishing)
{
context.AddEventPublished<FailedMessagesArchived>();
context.AddEventPublished<FailedMessagesUnArchived>();
context.AddEventPublished<MessageFailed>();
context.AddEventPublished<MessageFailureResolvedByRetry>();
context.AddEventPublished<MessageFailureResolvedManually>();
context.AddEventPublished<MessageEditedAndRetried>();
}
}

public override void Configure(Settings settings, ITransportCustomization transportCustomization, IHostApplicationBuilder hostBuilder)
{
var services = hostBuilder.Services;
Expand Down Expand Up @@ -106,21 +138,6 @@ public override void Configure(Settings settings, ITransportCustomization transp
services.AddEventLogMapping<MessagesSubmittedForRetryFailedDefinition>();
}

public override void Setup(Settings settings, IComponentInstallationContext context, IHostApplicationBuilder hostBuilder)
{
context.CreateQueue(settings.StagingQueue);

if (settings.IngestErrorMessages)
{
context.CreateQueue(settings.ErrorQueue);
}

if (settings.ForwardErrorMessages && settings.ErrorLogQueue != null)
{
context.CreateQueue(settings.ErrorLogQueue);
}
}

class FailedMessageNotificationsHostedService : IHostedService
{
public FailedMessageNotificationsHostedService(
Expand Down
Loading
Loading