EventBus
IEventHandler 事件处理器
public interface IEventHandler
{
}
ILocalEventHandler 本地事件处理器
public interface ILocalEventHandler<in TEvent> : IEventHandler
{
Task HandleEventAsync(TEvent eventData);
}
ActionEventHandler 通过委托处理的 EventHandler
public class ActionEventHandler<TEvent> :
ILocalEventHandler<TEvent>,
ITransientDependency
{
public Func<TEvent, Task> Action { get; }
public ActionEventHandler(Func<TEvent, Task> handler)
{
Action = handler;
}
public async Task HandleEventAsync(TEvent eventData)
{
await Action(eventData);
}
}
IDistributedEventHandler 分布式事件处理器
namespace Volo.Abp.EventBus.Distributed;
public interface IDistributedEventHandler<in TEvent> : IEventHandler
{
Task HandleEventAsync(TEvent eventData);
}
IEventHandlerInvoker
public interface IEventHandlerInvoker
{
Task InvokeAsync(IEventHandler eventHandler, object eventData, Type eventType);
}
//
public delegate Task EventHandlerMethodExecutorAsync(IEventHandler target,
object parameter);
public interface IEventHandlerMethodExecutor
{
EventHandlerMethodExecutorAsync ExecutorAsync { get; }
}
IEventHandlerFactory
获取 IEventHandler
public interface IEventHandlerDisposeWrapper : IDisposable
{
IEventHandler EventHandler { get; }
}
public class EventHandlerDisposeWrapper : IEventHandlerDisposeWrapper
{
public IEventHandler EventHandler { get; }
private readonly Action? _disposeAction;
public EventHandlerDisposeWrapper(IEventHandler eventHandler, Action?
disposeAction = null)
{
_disposeAction = disposeAction;
EventHandler = eventHandler;
}
//事件处理后的销毁工作
public void Dispose()
{
_disposeAction?.Invoke();
}
}
//
public interface IEventHandlerFactory
{
//获取 IEventHandler 带有 dispose 功能
IEventHandlerDisposeWrapper GetHandler();
//判断一个 IEventHandler 工厂是否能生成
bool IsInFactories(List<IEventHandlerFactory> handlerFactories);
}
IocEventHandlerFactory 在 ICO 里 指定的 HandlerType
public class IocEventHandlerFactory : IEventHandlerFactory, IDisposable
{
public Type HandlerType { get; }
protected IServiceScopeFactory ScopeFactory { get; }
public IocEventHandlerFactory(IServiceScopeFactory scopeFactory, Type handlerType)
{
ScopeFactory = scopeFactory;
HandlerType = handlerType;
}
public IEventHandlerDisposeWrapper GetHandler()
{
var scope = ScopeFactory.CreateScope();
return new EventHandlerDisposeWrapper(
(IEventHandler)scope.ServiceProvider.GetRequiredService(HandlerType),
// 销毁 Dispose,这里形成闭包,如果不是这个闭包,GetRequiredService 的服务获取的对象
// 在返回后是不可用的
() => scope.Dispose()
);
}
public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
{
//提供的 handlerFactories 是否有 HandlerType,没有的化会把 this 对象加入 handlerFactories
return handlerFactories
.OfType<IocEventHandlerFactory>()
.Any(f => f.HandlerType == HandlerType);
}
public void Dispose()
{
}
}
SingleInstanceHandlerFactory 唯一的 IEventHandler
public class SingleInstanceHandlerFactory : IEventHandlerFactory
{
public IEventHandler HandlerInstance { get; }
public SingleInstanceHandlerFactory(IEventHandler handler)
{
HandlerInstance = handler;
}
public IEventHandlerDisposeWrapper GetHandler()
{
return new EventHandlerDisposeWrapper(HandlerInstance);
}
public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
{
return handlerFactories
.OfType<SingleInstanceHandlerFactory>()
.Any(f => f.HandlerInstance == HandlerInstance);
}
}
TransientEventHandlerFactory 每次反射生成唯一的
public class TransientEventHandlerFactory : IEventHandlerFactory
{
public Type HandlerType { get; }
public TransientEventHandlerFactory(Type handlerType)
{
HandlerType = handlerType;
}
public virtual IEventHandlerDisposeWrapper GetHandler()
{
var handler = CreateHandler();
return new EventHandlerDisposeWrapper(
handler,
() => (handler as IDisposable)?.Dispose()
);
}
public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
{
return handlerFactories
.OfType<TransientEventHandlerFactory>()
.Any(f => f.HandlerType == HandlerType);
}
protected virtual IEventHandler CreateHandler()
{
return (IEventHandler)Activator.CreateInstance(HandlerType)!;
}
}
IEventBus 事件总线
public interface IEventBus
{
Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class;
Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true);
IDisposable Subscribe<TEvent>(Func<TEvent, Task> action)
where TEvent : class;
IDisposable Subscribe<TEvent, THandler>()
where TEvent : class
where THandler : IEventHandler, new();
IDisposable Subscribe(Type eventType, IEventHandler handler);
IDisposable Subscribe<TEvent>(IEventHandlerFactory factory)
where TEvent : class;
IDisposable Subscribe(Type eventType, IEventHandlerFactory factory);
void Unsubscribe<TEvent>(Func<TEvent, Task> action)
where TEvent : class;
void Unsubscribe<TEvent>(ILocalEventHandler<TEvent> handler)
where TEvent : class;
void Unsubscribe(Type eventType, IEventHandler handler);
void Unsubscribe<TEvent>(IEventHandlerFactory factory)
where TEvent : class;
void Unsubscribe(Type eventType, IEventHandlerFactory factory);
void UnsubscribeAll<TEvent>()
where TEvent : class;
void UnsubscribeAll(Type eventType);
}
EventBusBase 抽象实现基类
public abstract class EventBusBase : IEventBus
{
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected ICurrentTenant CurrentTenant { get; }
protected IUnitOfWorkManager UnitOfWorkManager { get; }
protected IEventHandlerInvoker EventHandlerInvoker { get; }
//注入的服务表明它能做到的能力范围
protected EventBusBase(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IEventHandlerInvoker eventHandlerInvoker)
{
ServiceScopeFactory = serviceScopeFactory;
CurrentTenant = currentTenant;
UnitOfWorkManager = unitOfWorkManager;
EventHandlerInvoker = eventHandlerInvoker;
}
//Action 工厂
public virtual IDisposable Subscribe<TEvent>(Func<TEvent, Task> action)
where TEvent : class
{
return Subscribe(typeof(TEvent), new ActionEventHandler<TEvent>(action));
}
//瞬时工厂
public virtual IDisposable Subscribe<TEvent, THandler>()
where TEvent : class
where THandler : IEventHandler, new()
{
return Subscribe(typeof(TEvent), new TransientEventHandlerFactory<THandler>());
}
//指定的工厂,单例
public virtual IDisposable Subscribe(Type eventType, IEventHandler handler)
{
return Subscribe(eventType, new SingleInstanceHandlerFactory(handler));
}
//用户指定工厂
public virtual IDisposable Subscribe<TEvent>(IEventHandlerFactory factory)
where TEvent : class
{
return Subscribe(typeof(TEvent), factory);
}
//用户指定的工厂如何注册,子类自己实现
public abstract IDisposable Subscribe(Type eventType, IEventHandlerFactory factory);
public abstract void Unsubscribe<TEvent>(Func<TEvent, Task> action)
where TEvent : class;
public virtual void Unsubscribe<TEvent>(ILocalEventHandler<TEvent> handler)
where TEvent : class
{
Unsubscribe(typeof(TEvent), handler);
}
public abstract void Unsubscribe(Type eventType, IEventHandler handler);
public virtual void Unsubscribe<TEvent>(IEventHandlerFactory factory)
where TEvent : class
{
Unsubscribe(typeof(TEvent), factory);
}
public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory);
public virtual void UnsubscribeAll<TEvent>() where TEvent : class
{
UnsubscribeAll(typeof(TEvent));
}
public abstract void UnsubscribeAll(Type eventType);
public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class
{
return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
}
//发布事件
public virtual async Task PublishAsync(
Type eventType,
object eventData,
bool onUnitOfWorkComplete = true)
{
// 只要在工作单元完成时,才发布事件,分布式事件,分俩步执行,先完成本地工作单元事务
// 在发布分布式事件
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
{
AddToUnitOfWork(
UnitOfWorkManager.Current,
new UnitOfWorkEventRecord(eventType, eventData,
EventOrderGenerator.GetNext())
);
return;
}
//发布事件
await PublishToEventBusAsync(eventType, eventData);
}
protected abstract Task PublishToEventBusAsync(Type eventType, object eventData);
protected abstract void AddToUnitOfWork(IUnitOfWork unitOfWork,
UnitOfWorkEventRecord eventRecord);
//发布事件
public virtual async Task TriggerHandlersAsync(Type eventType, object eventData)
{
var exceptions = new List<Exception>();
await TriggerHandlersAsync(eventType, eventData, exceptions);
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);
}
}
//处理事件
protected virtual async Task TriggerHandlersAsync(Type eventType,
object eventData, List<Exception> exceptions, InboxConfig? inboxConfig = null)
{
await new SynchronizationContextRemover();
//获取所有处理事件的工厂类集合
foreach (var handlerFactories in GetHandlerFactories(eventType))
{
//触发所有的工厂
foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
{
await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType,
eventData, exceptions, inboxConfig);
}
}
//Implements generic argument inheritance. See IEventDataWithInheritableGenericArgument
if (eventType.GetTypeInfo().IsGenericType &&
eventType.GetGenericArguments().Length == 1 &&
typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
{
var genericArg = eventType.GetGenericArguments()[0];
var baseArg = genericArg.GetTypeInfo().BaseType;
if (baseArg != null)
{
var baseEventType = eventType.GetGenericTypeDefinition().
MakeGenericType(baseArg);
var constructorArgs =
((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
var baseEventData =
Activator.CreateInstance(baseEventType, constructorArgs)!;
await PublishToEventBusAsync(baseEventType, baseEventData);
}
}
}
protected void ThrowOriginalExceptions(Type eventType, List<Exception> exceptions)
{
if (exceptions.Count == 1)
{
exceptions[0].ReThrow();
}
throw new AggregateException(
"More than one error has occurred while triggering the event: " + eventType,
exceptions
);
}
protected virtual void SubscribeHandlers(ITypeList<IEventHandler> handlers)
{
foreach (var handler in handlers)
{
var interfaces = handler.GetInterfaces();
foreach (var @interface in interfaces)
{
if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
{
continue;
}
var genericArgs = @interface.GetGenericArguments();
if (genericArgs.Length == 1)
{
Subscribe(genericArgs[0],
new IocEventHandlerFactory(ServiceScopeFactory, handler));
}
}
}
}
protected abstract IEnumerable<EventTypeWithEventHandlerFactories>
GetHandlerFactories(Type eventType);
//
protected virtual async Task TriggerHandlerAsync(
IEventHandlerFactory asyncHandlerFactory, Type eventType,
object eventData, List<Exception> exceptions, InboxConfig? inboxConfig = null)
{
//处理完成在 dispose 方法里清除字典
using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
{
try
{
var handlerType = eventHandlerWrapper.EventHandler.GetType();
if (inboxConfig?.HandlerSelector != null &&
!inboxConfig.HandlerSelector(handlerType))
{
return;
}
using (CurrentTenant.Change(GetEventDataTenantId(eventData)))
{
await InvokeEventHandlerAsync(
eventHandlerWrapper.EventHandler, eventData, eventType);
}
}
catch (TargetInvocationException ex)
{
exceptions.Add(ex.InnerException!);
}
catch (Exception ex)
{
exceptions.Add(ex);
}
}
}
protected virtual Task InvokeEventHandlerAsync(
IEventHandler eventHandler, object eventData, Type eventType)
{
return EventHandlerInvoker.InvokeAsync(eventHandler, eventData, eventType);
}
protected virtual Guid? GetEventDataTenantId(object eventData)
{
return eventData switch
{
IMultiTenant multiTenantEventData =>
multiTenantEventData.TenantId,
IEventDataMayHaveTenantId eventDataMayHaveTenantId
when eventDataMayHaveTenantId.IsMultiTenant(out var tenantId) => tenantId,
_ => CurrentTenant.Id
};
}
protected class EventTypeWithEventHandlerFactories
{
public Type EventType { get; }
public List<IEventHandlerFactory> EventHandlerFactories { get; }
public EventTypeWithEventHandlerFactories(Type eventType,
List<IEventHandlerFactory> eventHandlerFactories)
{
EventType = eventType;
EventHandlerFactories = eventHandlerFactories;
}
}
// Reference from
// https://blogs.msdn.microsoft.com/benwilli/2017/
// 02/09/an-alternative-to-configureawaitfalse-everywhere/
protected struct SynchronizationContextRemover : INotifyCompletion
{
public bool IsCompleted {
get { return SynchronizationContext.Current == null; }
}
public void OnCompleted(Action continuation)
{
var prevContext = SynchronizationContext.Current;
try
{
SynchronizationContext.SetSynchronizationContext(null);
continuation();
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevContext);
}
}
public SynchronizationContextRemover GetAwaiter()
{
return this;
}
public void GetResult()
{
}
}
}
EventHandlerInvoker 处理事件的最底层
public delegate Task EventHandlerMethodExecutorAsync(IEventHandler target, object parameter);
public interface IEventHandlerMethodExecutor
{
EventHandlerMethodExecutorAsync ExecutorAsync { get; }
}
public class LocalEventHandlerMethodExecutor<TEvent> : IEventHandlerMethodExecutor
where TEvent : class
{
public EventHandlerMethodExecutorAsync ExecutorAsync => (target, parameter) =>
target.As<ILocalEventHandler<TEvent>>().HandleEventAsync(parameter.As<TEvent>());
//执行委托
public Task ExecuteAsync(IEventHandler target, TEvent parameters)
{
return ExecutorAsync(target, parameters);
}
}
public class DistributedEventHandlerMethodExecutor<TEvent> : IEventHandlerMethodExecutor
where TEvent : class
{
public EventHandlerMethodExecutorAsync ExecutorAsync => (target, parameter) =>
target.As<IDistributedEventHandler<TEvent>>().HandleEventAsync(parameter.As<TEvent>());
public Task ExecuteAsync(IEventHandler target, TEvent parameters)
{
return ExecutorAsync(target, parameters);
}
}
EventHandlerInvoker 作为单例
public class EventHandlerInvoker : IEventHandlerInvoker, ISingletonDependency
{
//因为是反射生成处理执行器,这里使用缓存
private readonly ConcurrentDictionary<string, EventHandlerInvokerCacheItem> _cache;
public EventHandlerInvoker()
{
_cache = new ConcurrentDictionary<string, EventHandlerInvokerCacheItem>();
}
public async Task InvokeAsync(IEventHandler eventHandler, object eventData,
Type eventType)
{
var cacheItem = _cache.GetOrAdd(
$"{eventHandler.GetType().FullName}-{eventType.FullName}", _ =>
{
var item = new EventHandlerInvokerCacheItem();
if (typeof(ILocalEventHandler<>).
MakeGenericType(eventType).IsInstanceOfType(eventHandler))
{
item.Local = (IEventHandlerMethodExecutor?)Activator.
CreateInstance(typeof(LocalEventHandlerMethodExecutor<>).
MakeGenericType(eventType));
}
if (typeof(IDistributedEventHandler<>).
MakeGenericType(eventType).IsInstanceOfType(eventHandler))
{
item.Distributed = (IEventHandlerMethodExecutor?)Activator.
CreateInstance(typeof(DistributedEventHandlerMethodExecutor<>).
MakeGenericType(eventType));
}
return item;
});
if (cacheItem.Local != null)
{
//执行本地事件处理器
await cacheItem.Local.ExecutorAsync(eventHandler, eventData);
}
if (cacheItem.Distributed != null)
{
//执行分布式事件处理器
await cacheItem.Distributed.ExecutorAsync(eventHandler, eventData);
}
if (cacheItem.Local == null && cacheItem.Distributed == null)
{
throw new AbpException("The object instance is not an event handler.
Object type: " + eventHandler.GetType().AssemblyQualifiedName);
}
}
}
ILocalEventBus 本地事件总线
public interface ILocalEventBus : IEventBus
{
IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler)
where TEvent : class;
}
LocalEventBus 的实现
[ExposeServices(typeof(ILocalEventBus), typeof(LocalEventBus))]
public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
public ILogger<LocalEventBus> Logger { get; set; }
protected AbpLocalEventBusOptions Options { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories
{ get; }
public LocalEventBus(
IOptions<AbpLocalEventBusOptions> options,
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IEventHandlerInvoker eventHandlerInvoker)
: base(serviceScopeFactory, currentTenant, unitOfWorkManager, eventHandlerInvoker)
{
Options = options.Value;
Logger = NullLogger<LocalEventBus>.Instance;
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
SubscribeHandlers(Options.Handlers);
}
public virtual IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler)
where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
//
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
//事件总线维护并发字典,保存 eventType 和一些例的 HandlerFactory 的字典
//本地字典里获取 eventType 相应的 HandlerFactory 集合
GetOrCreateHandlerFactories(eventType)
.Locking(factories =>
{
// factory 是要使用的具体工厂在本地事件总线维护的工厂里是否维护过,
//没有维护过就加入
if (!factory.IsInFactories(factories))
{
factories.Add(factory);
}
}
);
//取消订阅
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
{
Check.NotNull(action, nameof(action));
GetOrCreateHandlerFactories(typeof(TEvent))
.Locking(factories =>
{
factories.RemoveAll(
factory =>
{
var singleInstanceFactory = factory as
SingleInstanceHandlerFactory;
if (singleInstanceFactory == null)
{
return false;
}
var actionHandler = singleInstanceFactory.HandlerInstance
as ActionEventHandler<TEvent>;
if (actionHandler == null)
{
return false;
}
return actionHandler.Action == action;
});
});
}
//多指定的事件,移除指定的唯一事件处理器
public override void Unsubscribe(Type eventType, IEventHandler handler)
{
//查询指定事件类型的所有处理器工厂
GetOrCreateHandlerFactories(eventType)
.Locking(factories =>
{
//异常指定的处理器工厂,这里因为指定了具体的处理器实例,所以移除的是
// SingleInstanceHandlerFactory 工厂
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory &&
((factory as SingleInstanceHandlerFactory)!).
HandlerInstance == handler
);
});
}
//直接异常指定的工厂
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
GetOrCreateHandlerFactories(eventType).Locking(factories =>
factories.Remove(factory));
}
//取消所有的事件处理器
public override void UnsubscribeAll(Type eventType)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
}
protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
{
await PublishAsync(new LocalEventMessage(Guid.NewGuid(), eventData, eventType));
}
protected override void AddToUnitOfWork(IUnitOfWork unitOfWork,
UnitOfWorkEventRecord eventRecord)
{
//给工作单元增加事件记录,
unitOfWork.AddOrReplaceLocalEvent(eventRecord);
}
//发布事件
public virtual async Task PublishAsync(LocalEventMessage localEventMessage)
{
await TriggerHandlersAsync(localEventMessage.EventType, localEventMessage.EventData);
}
protected override IEnumerable<EventTypeWithEventHandlerFactories>
GetHandlerFactories(Type eventType)
{
var handlerFactoryList = new List<Tuple<IEventHandlerFactory, Type, int>>();
foreach (var handlerFactory in HandlerFactories.Where(hf =>
ShouldTriggerEventForHandler(eventType, hf.Key)))
{
foreach (var factory in handlerFactory.Value)
{
handlerFactoryList.Add(new Tuple<IEventHandlerFactory, Type, int>(
factory,
handlerFactory.Key,
ReflectionHelper.GetAttributesOfMemberOrDeclaringType<
LocalEventHandlerOrderAttribute>
(factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0));
}
}
return handlerFactoryList.OrderBy(x => x.Item3).Select(x =>
new EventTypeWithEventHandlerFactories(
x.Item2, new List<IEventHandlerFactory> {x.Item1})).ToArray();
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(eventType, (type) =>
new List<IEventHandlerFactory>());
}
private static bool ShouldTriggerEventForHandler(
Type targetEventType, Type handlerEventType)
{
//Should trigger same type
if (handlerEventType == targetEventType)
{
return true;
}
//Should trigger for inherited types
if (handlerEventType.IsAssignableFrom(targetEventType))
{
return true;
}
return false;
}
// Internal for unit testing
internal Func<Type, object, Task>? OnEventHandleInvoking { get; set; }
// Internal for unit testing
protected async override Task InvokeEventHandlerAsync(
IEventHandler eventHandler, object eventData, Type eventType)
{
if (OnEventHandleInvoking != null && eventType !=
typeof(DistributedEventSent) && eventType != typeof(DistributedEventReceived))
{
await OnEventHandleInvoking(eventType, eventData);
}
await base.InvokeEventHandlerAsync(eventHandler, eventData, eventType);
}
// Internal for unit testing
internal Func<Type, object, Task>? OnPublishing { get; set; }
// For unit testing
public async override Task PublishAsync(
Type eventType,
object eventData,
bool onUnitOfWorkComplete = true)
{
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
{
AddToUnitOfWork(
UnitOfWorkManager.Current,
new UnitOfWorkEventRecord(
eventType, eventData, EventOrderGenerator.GetNext())
);
return;
}
// For unit testing
if (OnPublishing != null && eventType !=
typeof(DistributedEventSent) && eventType != typeof(DistributedEventReceived))
{
await OnPublishing(eventType, eventData);
}
await PublishToEventBusAsync(eventType, eventData);
}
}
IDistributedEventBus 分布式事件总线
public interface IDistributedEventBus : IEventBus
{
IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler)
where TEvent : class;
Task PublishAsync<TEvent>(
TEvent eventData,
bool onUnitOfWorkComplete = true,
bool useOutbox = true)
where TEvent : class;
Task PublishAsync(
Type eventType,
object eventData,
bool onUnitOfWorkComplete = true,
bool useOutbox = true);
}
namespace Volo.Abp.EventBus.Distributed;
public abstract class DistributedEventBusBase :
EventBusBase, IDistributedEventBus, ISupportsEventBoxes
{
protected IGuidGenerator GuidGenerator { get; }
protected IClock Clock { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
protected ILocalEventBus LocalEventBus { get; }
protected ICorrelationIdProvider CorrelationIdProvider { get; }
protected DistributedEventBusBase(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IUnitOfWorkManager unitOfWorkManager,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IGuidGenerator guidGenerator,
IClock clock,
IEventHandlerInvoker eventHandlerInvoker,
ILocalEventBus localEventBus,
ICorrelationIdProvider correlationIdProvider) : base(
serviceScopeFactory,
currentTenant,
unitOfWorkManager,
eventHandlerInvoker)
{
GuidGenerator = guidGenerator;
Clock = clock;
AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value;
LocalEventBus = localEventBus;
CorrelationIdProvider = correlationIdProvider;
}
public IDisposable Subscribe<TEvent>(
IDistributedEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}
public override Task PublishAsync(
Type eventType, object eventData, bool onUnitOfWorkComplete = true)
{
return PublishAsync(eventType, eventData, onUnitOfWorkComplete, useOutbox: true);
}
public Task PublishAsync<TEvent>(
TEvent eventData,
bool onUnitOfWorkComplete = true,
bool useOutbox = true)
where TEvent : class
{
return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox);
}
public async Task PublishAsync(
Type eventType,
object eventData,
bool onUnitOfWorkComplete = true,
bool useOutbox = true)
{
if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
{
AddToUnitOfWork(
UnitOfWorkManager.Current,
new UnitOfWorkEventRecord(
eventType, eventData, EventOrderGenerator.GetNext(), useOutbox)
);
return;
}
if (useOutbox)
{
if (await AddToOutboxAsync(eventType, eventData))
{
return;
}
}
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
});
await PublishToEventBusAsync(eventType, eventData);
}
public abstract Task PublishFromOutboxAsync(
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig
);
public abstract Task PublishManyFromOutboxAsync(
IEnumerable<OutgoingEventInfo> outgoingEvents,
OutboxConfig outboxConfig
);
public abstract Task ProcessFromInboxAsync(
IncomingEventInfo incomingEvent,
InboxConfig inboxConfig);
protected virtual async Task<bool> AddToOutboxAsync(Type eventType, object eventData)
{
var unitOfWork = UnitOfWorkManager.Current;
if (unitOfWork == null)
{
return false;
}
foreach (var outboxConfig in AbpDistributedEventBusOptions.Outboxes.Values.
OrderBy(x => x.Selector is null))
{
if (outboxConfig.Selector == null || outboxConfig.Selector(eventType))
{
var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.
GetRequiredService(outboxConfig.ImplementationType);
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
await OnAddToOutboxAsync(eventName, eventType, eventData);
var outgoingEventInfo = new OutgoingEventInfo(
GuidGenerator.Create(),
eventName,
Serialize(eventData),
Clock.Now
);
outgoingEventInfo.SetCorrelationId(CorrelationIdProvider.Get()!);
await eventOutbox.EnqueueAsync(outgoingEventInfo);
return true;
}
}
return false;
}
protected virtual Task OnAddToOutboxAsync(string eventName,
Type eventType, object eventData)
{
return Task.CompletedTask;
}
protected async Task<bool> AddToInboxAsync(
string? messageId,
string eventName,
Type eventType,
object eventData,
string? correlationId)
{
if (AbpDistributedEventBusOptions.Inboxes.Count <= 0)
{
return false;
}
using (var scope = ServiceScopeFactory.CreateScope())
{
foreach (var inboxConfig in AbpDistributedEventBusOptions.
Inboxes.Values.OrderBy(x => x.EventSelector is null))
{
if (inboxConfig.EventSelector == null ||
inboxConfig.EventSelector(eventType))
{
var eventInbox =
(IEventInbox)scope.ServiceProvider.
GetRequiredService(inboxConfig.ImplementationType);
if (!messageId.IsNullOrEmpty())
{
if (await eventInbox.ExistsByMessageIdAsync(messageId!))
{
continue;
}
}
var incomingEventInfo = new IncomingEventInfo(
GuidGenerator.Create(),
messageId!,
eventName,
Serialize(eventData),
Clock.Now
);
incomingEventInfo.SetCorrelationId(correlationId!);
await eventInbox.EnqueueAsync(incomingEventInfo);
}
}
}
return true;
}
protected abstract byte[] Serialize(object eventData);
protected virtual async Task TriggerHandlersDirectAsync(
Type eventType, object eventData)
{
await TriggerDistributedEventReceivedAsync(new DistributedEventReceived
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
});
await TriggerHandlersAsync(eventType, eventData);
}
protected virtual async Task TriggerHandlersFromInboxAsync(
Type eventType, object eventData,
List<Exception> exceptions, InboxConfig? inboxConfig = null)
{
await TriggerDistributedEventReceivedAsync(new DistributedEventReceived
{
Source = DistributedEventSource.Inbox,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
});
await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig);
}
public virtual async Task TriggerDistributedEventSentAsync(
DistributedEventSent distributedEvent)
{
try
{
await LocalEventBus.PublishAsync(distributedEvent);
}
catch (Exception _)
{
// ignored
}
}
public virtual async Task TriggerDistributedEventReceivedAsync(
DistributedEventReceived distributedEvent)
{
try
{
await LocalEventBus.PublishAsync(distributedEvent);
}
catch (Exception _)
{
// ignored
}
}
}
分布式事件总线基础概念支撑
OutgoingEventRecord 发件箱持有的数据
volo.Abp.EntityFrameworkCore 模块定义的聚合根
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;
public class OutgoingEventRecord :
BasicAggregateRoot<Guid>,
IHasExtraProperties,
IHasCreationTime
{
public static int MaxEventNameLength { get; set; } = 256;
public ExtraPropertyDictionary ExtraProperties { get; private set; }
//事件名称
public string EventName { get; private set; } = default!;
//事件包含的数据是可序列化,可网络传递的数据
public byte[] EventData { get; private set; } = default!;
public DateTime CreationTime { get; private set; }
protected OutgoingEventRecord()
{
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
public OutgoingEventRecord(
OutgoingEventInfo eventInfo)
: base(eventInfo.Id)
{
EventName = eventInfo.EventName;
EventData = eventInfo.EventData;
CreationTime = eventInfo.CreationTime;
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
public OutgoingEventInfo ToOutgoingEventInfo()
{
var info = new OutgoingEventInfo(
Id,
EventName,
EventData,
CreationTime
);
foreach (var property in ExtraProperties)
{
info.SetProperty(property.Key, property.Value);
}
return info;
}
}
IncomingEventRecord 收件箱持有的数据
volo.Abp.EntityFrameworkCore 模块定义的聚合根
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;
public class IncomingEventRecord :
BasicAggregateRoot<Guid>,
IHasExtraProperties,
IHasCreationTime
{
public static int MaxEventNameLength { get; set; } = 256;
public ExtraPropertyDictionary ExtraProperties { get; private set; }
//消息 ID rabbit mq 需要
public string MessageId { get; private set; } = default!;
public string EventName { get; private set; } = default!;
public byte[] EventData { get; private set; } = default!;
public DateTime CreationTime { get; private set; }
//是否处理
public bool Processed { get; set; }
//处理时间
public DateTime? ProcessedTime { get; set; }
protected IncomingEventRecord()
{
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
public IncomingEventRecord(
IncomingEventInfo eventInfo)
: base(eventInfo.Id)
{
MessageId = eventInfo.MessageId;
EventName = eventInfo.EventName;
EventData = eventInfo.EventData;
CreationTime = eventInfo.CreationTime;
ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
public IncomingEventInfo ToIncomingEventInfo()
{
var info = new IncomingEventInfo(
Id,
MessageId,
EventName,
EventData,
CreationTime
);
foreach (var property in ExtraProperties)
{
info.SetProperty(property.Key, property.Value);
}
return info;
}
public void MarkAsProcessed(DateTime processedTime)
{
Processed = true;
ProcessedTime = processedTime;
}
}
IHasEventInbox IHasEventOutbox 数据库上下文
数据库上下文持有的收发件箱的数据
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;
public interface IHasEventInbox : IEfCoreDbContext
{
DbSet<IncomingEventRecord> IncomingEvents { get; set; }
}
public interface IHasEventOutbox : IEfCoreDbContext
{
DbSet<OutgoingEventRecord> OutgoingEvents { get; set; }
}
收发件箱提供的服务
namespace Volo.Abp.EventBus.Distributed;
public interface IEventOutbox
{
//加入发件箱
Task EnqueueAsync(OutgoingEventInfo outgoingEvent);
//获取发件箱内的 OutgoingEvent
Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount,
CancellationToken cancellationToken = default);
//删除一个发件信息
Task DeleteAsync(Guid id);
//删除多个发件信息
Task DeleteManyAsync(IEnumerable<Guid> ids);
}
public interface IEventInbox
{
//收件箱加入信息
Task EnqueueAsync(IncomingEventInfo incomingEvent);
//获取所有收件信息
Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount,
CancellationToken cancellationToken = default);
//标记事件已经处理
Task MarkAsProcessedAsync(Guid id);
//判断消息是否存在收件箱
Task<bool> ExistsByMessageIdAsync(string messageId);
//删除所有已经处理的信息
Task DeleteOldEventsAsync();
}
服务的提供者通过 ef core,进行提供
public interface IDbContextEventOutbox<TDbContext> : IEventOutbox
where TDbContext : IHasEventOutbox
{
}
public interface IDbContextEventInbox<TDbContext> : IEventInbox
where TDbContext : IHasEventInbox
{
}
IDbContextEventInbox 的实现 收件箱处理后的不会立即删除会有个缓存时间
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;
public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
protected IDbContextProvider<TDbContext> DbContextProvider { get; }
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected IClock Clock { get; }
// IDbContextProvider<TDbContext> 获取上下文提供器
public DbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
{
DbContextProvider = dbContextProvider;
Clock = clock;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
}
[UnitOfWork]
public virtual async Task EnqueueAsync(IncomingEventInfo incomingEvent)
{
//因为不是默认的支持工作单元的服务,故这里手动加 UnitOfWork,如果
//使用了工作单元中间件,会使用工作单元中间件的工作单元处理
//如果没有工作单元中间件,就新开工作单元处理
var dbContext = await DbContextProvider.GetDbContextAsync();
dbContext.IncomingEvents.Add(new IncomingEventRecord(incomingEvent));
}
[UnitOfWork]
public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(
int maxCount, CancellationToken cancellationToken = default)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var outgoingEventRecords = await dbContext
.IncomingEvents
.AsNoTracking()
.Where(x => !x.Processed)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync(cancellationToken: cancellationToken);
return outgoingEventRecords
.Select(x => x.ToIncomingEventInfo())
.ToList();
}
//标记为已经处理
[UnitOfWork]
public virtual async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x =>
x.SetProperty(p => p.Processed, _ => true).SetProperty(p =>
p.ProcessedTime, _ => Clock.Now));
}
[UnitOfWork]
public virtual async Task<bool> ExistsByMessageIdAsync(string messageId)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
return await dbContext.IncomingEvents.AnyAsync(x => x.MessageId == messageId);
}
//删除已经处理的,事件生成时间超出范围的收件箱记录
[UnitOfWork]
public virtual async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var timeToKeepEvents = Clock.Now -
EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
await dbContext.IncomingEvents
.Where(x => x.Processed && x.CreationTime < timeToKeepEvents)
.ExecuteDeleteAsync();
}
}
IDbContextEventOutbox 的实现
public class DbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
protected IDbContextProvider<TDbContext> DbContextProvider { get; }
public DbContextEventOutbox(
IDbContextProvider<TDbContext> dbContextProvider)
{
DbContextProvider = dbContextProvider;
}
[UnitOfWork]
public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent)
{
var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync();
dbContext.OutgoingEvents.Add(new OutgoingEventRecord(outgoingEvent));
}
//获取所有待处理的发件箱记录,因为方法以Get 开头不会加入事务
[UnitOfWork]
public virtual async Task<List<OutgoingEventInfo>>
GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
{
var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync();
var outgoingEventRecords = await dbContext
.OutgoingEvents
.AsNoTracking()
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync(cancellationToken: cancellationToken);
return outgoingEventRecords
.Select(x => x.ToOutgoingEventInfo())
.ToList();
}
//删除指定的
[UnitOfWork]
public virtual async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync();
await dbContext.OutgoingEvents.Where(x => x.Id == id).ExecuteDeleteAsync();
}
[UnitOfWork]
public virtual async Task DeleteManyAsync(IEnumerable<Guid> ids)
{
var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync();
await dbContext.OutgoingEvents.Where(x => ids.Contains(x.Id)).ExecuteDeleteAsync();
}
}
收发件箱的配置
public class InboxConfig
{
[NotNull]
public string Name { get; }
[NotNull]
public string DatabaseName {
get => _databaseName;
set => _databaseName = Check.NotNullOrWhiteSpace(value, nameof(DatabaseName));
}
//数据库连接字符串
[NotNull] private string _databaseName = default!;
//收件箱提供服务的类型
public Type ImplementationType { get; set; } = default!;
//进入收件箱的过滤器,收件箱不是所有事件都收集
public Func<Type, bool>? EventSelector { get; set; }
//收件箱 事件处理器过滤器,收件箱里可以指定事件处理器是否可以处理事件
public Func<Type, bool>? HandlerSelector { get; set; }
/// <summary>
/// Used to enable/disable processing incoming events.
/// Default: true.
/// </summary>
public bool IsProcessingEnabled { get; set; } = true;
public InboxConfig([NotNull] string name)
{
Name = Check.NotNullOrWhiteSpace(name, nameof(name));
}
}
public class OutboxConfig
{
[NotNull]
public string Name { get; }
[NotNull]
public string DatabaseName {
get => _databaseName;
set => _databaseName = Check.NotNullOrWhiteSpace(value, nameof(DatabaseName));
}
//发件箱数据库连接字符串
[NotNull] private string _databaseName = default!;
//发件箱处理服务类
public Type ImplementationType { get; set; } = default!;
//选择进入发件箱的事件类型
public Func<Type, bool>? Selector { get; set; }
/// <summary>
/// Used to enable/disable sending events from outbox to the message broker.
/// Default: true.
/// </summary>
public bool IsSendingEnabled { get; set; } = true;
public OutboxConfig([NotNull] string name)
{
Name = Check.NotNullOrWhiteSpace(name, nameof(name));
}
}
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;
public static class EfCoreInboxConfigExtensions
{
public static void UseDbContext<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
//指定服务实现类 DbContextEventInbox<>
outboxConfig.ImplementationType = typeof(IDbContextEventInbox<TDbContext>);
//通过 db context 上下文 获取的 Attribute 获取链接字符串
outboxConfig.DatabaseName = ConnectionStringNameAttribute.
GetConnStringName<TDbContext>();
}
}
public static class EfCoreOutboxConfigExtensions
{
public static void UseDbContext<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(IDbContextEventOutbox<TDbContext>);
outboxConfig.DatabaseName = ConnectionStringNameAttribute.
GetConnStringName<TDbContext>();
}
}
DbContextEventOutbox , DbContextEventInbox 注入的地方
namespace Volo.Abp.EntityFrameworkCore;
[DependsOn(typeof(AbpDddDomainModule))]
public class AbpEntityFrameworkCoreModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
Configure<AbpDbContextOptions>(options =>
{
options.PreConfigure(abpDbContextConfigurationContext =>
{
abpDbContextConfigurationContext.DbContextOptions
.ConfigureWarnings(warnings =>
{
warnings.Ignore(CoreEventId.LazyLoadOnDisposedContextWarning);
});
});
});
context.Services.TryAddTransient(typeof(IDbContextProvider<>),
typeof(UnitOfWorkDbContextProvider<>));
context.Services.AddTransient(typeof(IDbContextEventOutbox<>),
typeof(DbContextEventOutbox<>));
context.Services.AddTransient(typeof(IDbContextEventInbox<>),
typeof(DbContextEventInbox<>));
}
}
IInboxProcessor 处理收件箱
public interface IInboxProcessor
{
Task StartAsync(InboxConfig inboxConfig, CancellationToken
cancellationToken = default);
Task StopAsync(CancellationToken cancellationToken = default);
}
namespace Volo.Abp.EventBus.Distributed;
public class InboxProcessor : IInboxProcessor, ITransientDependency
{
protected IServiceProvider ServiceProvider { get; }
protected AbpAsyncTimer Timer { get; }
protected IDistributedEventBus DistributedEventBus { get; }
protected IAbpDistributedLock DistributedLock { get; }
protected IUnitOfWorkManager UnitOfWorkManager { get; }
protected IClock Clock { get; }
protected IEventInbox Inbox { get; private set; } = default!;
protected InboxConfig InboxConfig { get; private set; } = default!;
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected DateTime? LastCleanTime { get; set; }
protected string DistributedLockName { get; private set; } = default!;
public ILogger<InboxProcessor> Logger { get; set; }
protected CancellationTokenSource StoppingTokenSource { get; }
protected CancellationToken StoppingToken { get; }
//
public InboxProcessor(
IServiceProvider serviceProvider,
AbpAsyncTimer timer,
IDistributedEventBus distributedEventBus,
IAbpDistributedLock distributedLock,
IUnitOfWorkManager unitOfWorkManager,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
{
ServiceProvider = serviceProvider;
Timer = timer;
DistributedEventBus = distributedEventBus;
DistributedLock = distributedLock;
UnitOfWorkManager = unitOfWorkManager;
Clock = clock;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.
TotalMilliseconds);
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<InboxProcessor>.Instance;
StoppingTokenSource = new CancellationTokenSource();
StoppingToken = StoppingTokenSource.Token;
}
private async Task TimerOnElapsed(AbpAsyncTimer arg)
{
//定时发送消息及清理收件箱
await RunAsync();
}
//启动定时器
public Task StartAsync(InboxConfig inboxConfig,
CancellationToken cancellationToken = default)
{
InboxConfig = inboxConfig;
Inbox = (IEventInbox)ServiceProvider.GetRequiredService(
inboxConfig.ImplementationType);
DistributedLockName = $"AbpInbox_{InboxConfig.DatabaseName}";
Timer.Start(cancellationToken);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken = default)
{
StoppingTokenSource.Cancel();
Timer.Stop(cancellationToken);
StoppingTokenSource.Dispose();
return Task.CompletedTask;
}
protected virtual async Task RunAsync()
{
if (StoppingToken.IsCancellationRequested)
{
return;
}
//
await using (var handle = await DistributedLock.
TryAcquireAsync(DistributedLockName, cancellationToken: StoppingToken))
{
if (handle != null)
{
//删除已经处理过的
await DeleteOldEventsAsync();
while (true)
{
//处理完退出,并释放锁
var waitingEvents = await Inbox.
GetWaitingEventsAsync(
EventBusBoxesOptions.InboxWaitingEventMaxCount, StoppingToken);
if (waitingEvents.Count <= 0)
{
break;
}
Logger.LogInformation($"Found {waitingEvents.Count} events in the inbox.");
foreach (var waitingEvent in waitingEvents)
{
//工作单元进行处理 手动开启
using (var uow = UnitOfWorkManager.Begin(
isTransactional: true, requiresNew: true))
{
//发送
await DistributedEventBus
.AsSupportsEventBoxes()
.ProcessFromInboxAsync(waitingEvent, InboxConfig);
await Inbox.MarkAsProcessedAsync(waitingEvent.Id);
await uow.CompleteAsync(StoppingToken);
}
Logger.LogInformation($"Processed the incoming event with id =
{waitingEvent.Id:N}");
}
}
}
else
{
//
Logger.LogDebug("Could not obtain the distributed lock: " +
DistributedLockName);
try
{ 延迟在此获取锁
await Task.Delay(EventBusBoxesOptions.DistributedLockWaitDuration,
StoppingToken);
}
catch (TaskCanceledException) { }
}
}
}
protected virtual async Task DeleteOldEventsAsync()
{
if (LastCleanTime != null && LastCleanTime + EventBusBoxesOptions.
CleanOldEventTimeIntervalSpan > Clock.Now)
{
return;
}
await Inbox.DeleteOldEventsAsync();
LastCleanTime = Clock.Now;
}
}
IOutboxSender
public interface IOutboxSender
{
Task StartAsync(OutboxConfig outboxConfig,
CancellationToken cancellationToken = default);
Task StopAsync(CancellationToken cancellationToken = default);
}
public class OutboxSender : IOutboxSender, ITransientDependency
{
protected IServiceProvider ServiceProvider { get; }
protected AbpAsyncTimer Timer { get; }
protected IDistributedEventBus DistributedEventBus { get; }
protected IAbpDistributedLock DistributedLock { get; }
protected IEventOutbox Outbox { get; private set; } = default!;
protected OutboxConfig OutboxConfig { get; private set; } = default!;
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected string DistributedLockName { get; private set; } = default!;
public ILogger<OutboxSender> Logger { get; set; }
protected CancellationTokenSource StoppingTokenSource { get; }
protected CancellationToken StoppingToken { get; }
public OutboxSender(
IServiceProvider serviceProvider,
AbpAsyncTimer timer,
IDistributedEventBus distributedEventBus,
IAbpDistributedLock distributedLock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
{
ServiceProvider = serviceProvider;
DistributedEventBus = distributedEventBus;
DistributedLock = distributedLock;
EventBusBoxesOptions = eventBusBoxesOptions.Value;
Timer = timer;
Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.
TotalMilliseconds);
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<OutboxSender>.Instance;
StoppingTokenSource = new CancellationTokenSource();
StoppingToken = StoppingTokenSource.Token;
}
public virtual Task StartAsync(OutboxConfig outboxConfig,
CancellationToken cancellationToken = default)
{
OutboxConfig = outboxConfig;
Outbox = (IEventOutbox)ServiceProvider.GetRequiredService(
outboxConfig.ImplementationType);
DistributedLockName = $"AbpOutbox_{OutboxConfig.DatabaseName}";
Timer.Start(cancellationToken);
return Task.CompletedTask;
}
public virtual Task StopAsync(CancellationToken cancellationToken = default)
{
StoppingTokenSource.Cancel();
Timer.Stop(cancellationToken);
StoppingTokenSource.Dispose();
return Task.CompletedTask;
}
private async Task TimerOnElapsed(AbpAsyncTimer arg)
{
await RunAsync();
}
protected virtual async Task RunAsync()
{
await using (var handle = await DistributedLock.TryAcquireAsync(
DistributedLockName, cancellationToken: StoppingToken))
{
if (handle != null)
{
while (true)
{
var waitingEvents = await Outbox.GetWaitingEventsAsync(
EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken);
if (waitingEvents.Count <= 0)
{
break;
}
Logger.LogInformation($"Found {waitingEvents.Count}
events in the outbox.");
if (EventBusBoxesOptions.BatchPublishOutboxEvents)
{
await PublishOutgoingMessagesInBatchAsync(waitingEvents);
}
else
{
await PublishOutgoingMessagesAsync(waitingEvents);
}
}
}
else
{
Logger.LogDebug("Could not obtain the distributed lock: " +
DistributedLockName);
try
{
await Task.Delay(EventBusBoxesOptions.DistributedLockWaitDuration,
StoppingToken);
}
catch (TaskCanceledException) { }
}
}
}
protected virtual async Task PublishOutgoingMessagesAsync(
List<OutgoingEventInfo> waitingEvents)
{
foreach (var waitingEvent in waitingEvents)
{
await DistributedEventBus
.AsSupportsEventBoxes()
.PublishFromOutboxAsync(
waitingEvent,
OutboxConfig
);
await Outbox.DeleteAsync(waitingEvent.Id);
Logger.LogInformation($"Sent the event to the message broker with id =
{waitingEvent.Id:N}");
}
}
protected virtual async Task PublishOutgoingMessagesInBatchAsync(
List<OutgoingEventInfo> waitingEvents)
{
await DistributedEventBus
.AsSupportsEventBoxes()
.PublishManyFromOutboxAsync(waitingEvents, OutboxConfig);
await Outbox.DeleteManyAsync(waitingEvents.Select(x => x.Id).ToArray());
Logger.LogInformation($"Sent {waitingEvents.Count} events to message broker");
}
}
👍🎉🎊