ABP EventBus
ABP EventBus
2023/6/1
➡️

EventBus

IEventHandler 事件处理器

public interface IEventHandler
{

}

ILocalEventHandler 本地事件处理器

public interface ILocalEventHandler<in TEvent> : IEventHandler
{
   Task HandleEventAsync(TEvent eventData);
}

ActionEventHandler 通过委托处理的 EventHandler

public class ActionEventHandler<TEvent> :
    ILocalEventHandler<TEvent>,
    ITransientDependency
{

    public Func<TEvent, Task> Action { get; }

    public ActionEventHandler(Func<TEvent, Task> handler)
    {
        Action = handler;
    }

    public async Task HandleEventAsync(TEvent eventData)
    {
        await Action(eventData);
    }
}

IDistributedEventHandler 分布式事件处理器

namespace Volo.Abp.EventBus.Distributed;

public interface IDistributedEventHandler<in TEvent> : IEventHandler
{
    Task HandleEventAsync(TEvent eventData);
}

IEventHandlerInvoker

public interface IEventHandlerInvoker
{
    Task InvokeAsync(IEventHandler eventHandler, object eventData, Type eventType);
}

//
public delegate Task EventHandlerMethodExecutorAsync(IEventHandler target,
 object parameter);

public interface IEventHandlerMethodExecutor
{
    EventHandlerMethodExecutorAsync ExecutorAsync { get; }
}

IEventHandlerFactory

获取 IEventHandler

public interface IEventHandlerDisposeWrapper : IDisposable
{
    IEventHandler EventHandler { get; }
}

public class EventHandlerDisposeWrapper : IEventHandlerDisposeWrapper
{
    public IEventHandler EventHandler { get; }

    private readonly Action? _disposeAction;

    public EventHandlerDisposeWrapper(IEventHandler eventHandler, Action?
     disposeAction = null)
    {
        _disposeAction = disposeAction;
        EventHandler = eventHandler;
    }

    //事件处理后的销毁工作
    public void Dispose()
    {
        _disposeAction?.Invoke();
    }
}

//
public interface IEventHandlerFactory
{
    //获取  IEventHandler 带有 dispose 功能
    IEventHandlerDisposeWrapper GetHandler();
    //判断一个 IEventHandler 工厂是否能生成
    bool IsInFactories(List<IEventHandlerFactory> handlerFactories);
}

IocEventHandlerFactory 在 ICO 里 指定的 HandlerType

public class IocEventHandlerFactory : IEventHandlerFactory, IDisposable
{
  public Type HandlerType { get; }

  protected IServiceScopeFactory ScopeFactory { get; }

  public IocEventHandlerFactory(IServiceScopeFactory scopeFactory, Type handlerType)
  {
      ScopeFactory = scopeFactory;
      HandlerType = handlerType;
  }


  public IEventHandlerDisposeWrapper GetHandler()
  {
      var scope = ScopeFactory.CreateScope();
      return new EventHandlerDisposeWrapper(
          (IEventHandler)scope.ServiceProvider.GetRequiredService(HandlerType),
          // 销毁 Dispose,这里形成闭包,如果不是这个闭包,GetRequiredService 的服务获取的对象
          // 在返回后是不可用的
          () => scope.Dispose()
      );
  }

  public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
  {
    //提供的 handlerFactories 是否有 HandlerType,没有的化会把 this 对象加入 handlerFactories
    return handlerFactories
          .OfType<IocEventHandlerFactory>()
          .Any(f => f.HandlerType == HandlerType);
  }

  public void Dispose()
  {

  }
}

SingleInstanceHandlerFactory 唯一的 IEventHandler

public class SingleInstanceHandlerFactory : IEventHandlerFactory
{

    public IEventHandler HandlerInstance { get; }

    public SingleInstanceHandlerFactory(IEventHandler handler)
    {
        HandlerInstance = handler;
    }

    public IEventHandlerDisposeWrapper GetHandler()
    {
        return new EventHandlerDisposeWrapper(HandlerInstance);
    }

    public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
    {
        return handlerFactories
            .OfType<SingleInstanceHandlerFactory>()
            .Any(f => f.HandlerInstance == HandlerInstance);
    }
}

TransientEventHandlerFactory 每次反射生成唯一的

public class TransientEventHandlerFactory : IEventHandlerFactory
{
    public Type HandlerType { get; }

    public TransientEventHandlerFactory(Type handlerType)
    {
        HandlerType = handlerType;
    }

    public virtual IEventHandlerDisposeWrapper GetHandler()
    {
        var handler = CreateHandler();
        return new EventHandlerDisposeWrapper(
            handler,
            () => (handler as IDisposable)?.Dispose()
        );
    }

    public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
    {
        return handlerFactories
            .OfType<TransientEventHandlerFactory>()
            .Any(f => f.HandlerType == HandlerType);
    }

    protected virtual IEventHandler CreateHandler()
    {
        return (IEventHandler)Activator.CreateInstance(HandlerType)!;
    }
}

IEventBus 事件总线

public interface IEventBus
{

    Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
        where TEvent : class;

    Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true);

    IDisposable Subscribe<TEvent>(Func<TEvent, Task> action)
        where TEvent : class;

    IDisposable Subscribe<TEvent, THandler>()
        where TEvent : class
        where THandler : IEventHandler, new();

    IDisposable Subscribe(Type eventType, IEventHandler handler);

    IDisposable Subscribe<TEvent>(IEventHandlerFactory factory)
        where TEvent : class;

    IDisposable Subscribe(Type eventType, IEventHandlerFactory factory);

    void Unsubscribe<TEvent>(Func<TEvent, Task> action)
        where TEvent : class;

    void Unsubscribe<TEvent>(ILocalEventHandler<TEvent> handler)
        where TEvent : class;

    void Unsubscribe(Type eventType, IEventHandler handler);

    void Unsubscribe<TEvent>(IEventHandlerFactory factory)
        where TEvent : class;

    void Unsubscribe(Type eventType, IEventHandlerFactory factory);

    void UnsubscribeAll<TEvent>()
        where TEvent : class;

    void UnsubscribeAll(Type eventType);
}

EventBusBase 抽象实现基类

public abstract class EventBusBase : IEventBus
{
    protected IServiceScopeFactory ServiceScopeFactory { get; }

    protected ICurrentTenant CurrentTenant { get; }

    protected IUnitOfWorkManager UnitOfWorkManager { get; }

    protected IEventHandlerInvoker EventHandlerInvoker { get; }

    //注入的服务表明它能做到的能力范围
    protected EventBusBase(
        IServiceScopeFactory serviceScopeFactory,
        ICurrentTenant currentTenant,
        IUnitOfWorkManager unitOfWorkManager,
        IEventHandlerInvoker eventHandlerInvoker)
    {
        ServiceScopeFactory = serviceScopeFactory;
        CurrentTenant = currentTenant;
        UnitOfWorkManager = unitOfWorkManager;
        EventHandlerInvoker = eventHandlerInvoker;
    }

    //Action 工厂
    public virtual IDisposable Subscribe<TEvent>(Func<TEvent, Task> action)
    where TEvent : class
    {
        return Subscribe(typeof(TEvent), new ActionEventHandler<TEvent>(action));
    }

    //瞬时工厂
    public virtual IDisposable Subscribe<TEvent, THandler>()
        where TEvent : class
        where THandler : IEventHandler, new()
    {
        return Subscribe(typeof(TEvent), new TransientEventHandlerFactory<THandler>());
    }

    //指定的工厂,单例
    public virtual IDisposable Subscribe(Type eventType, IEventHandler handler)
    {
        return Subscribe(eventType, new SingleInstanceHandlerFactory(handler));
    }

    //用户指定工厂
    public virtual IDisposable Subscribe<TEvent>(IEventHandlerFactory factory)
    where TEvent : class
    {
        return Subscribe(typeof(TEvent), factory);
    }

    //用户指定的工厂如何注册,子类自己实现
    public abstract IDisposable Subscribe(Type eventType, IEventHandlerFactory factory);


    public abstract void Unsubscribe<TEvent>(Func<TEvent, Task> action)
    where TEvent : class;


    public virtual void Unsubscribe<TEvent>(ILocalEventHandler<TEvent> handler)
     where TEvent : class
    {
        Unsubscribe(typeof(TEvent), handler);
    }

    public abstract void Unsubscribe(Type eventType, IEventHandler handler);


    public virtual void Unsubscribe<TEvent>(IEventHandlerFactory factory)
     where TEvent : class
    {
        Unsubscribe(typeof(TEvent), factory);
    }

    public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory);


    public virtual void UnsubscribeAll<TEvent>() where TEvent : class
    {
        UnsubscribeAll(typeof(TEvent));
    }


    public abstract void UnsubscribeAll(Type eventType);


    public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
        where TEvent : class
    {
        return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
    }

    //发布事件
    public virtual async Task PublishAsync(
        Type eventType,
        object eventData,
        bool onUnitOfWorkComplete = true)
    {
        // 只要在工作单元完成时,才发布事件,分布式事件,分俩步执行,先完成本地工作单元事务
        // 在发布分布式事件
        if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
        {
            AddToUnitOfWork(
                UnitOfWorkManager.Current,
                new UnitOfWorkEventRecord(eventType, eventData,
                 EventOrderGenerator.GetNext())
            );
            return;
        }

        //发布事件
        await PublishToEventBusAsync(eventType, eventData);
    }

    protected abstract Task PublishToEventBusAsync(Type eventType, object eventData);

    protected abstract void AddToUnitOfWork(IUnitOfWork unitOfWork,
     UnitOfWorkEventRecord eventRecord);


    //发布事件
    public virtual async Task TriggerHandlersAsync(Type eventType, object eventData)
    {
        var exceptions = new List<Exception>();

        await TriggerHandlersAsync(eventType, eventData, exceptions);

        if (exceptions.Any())
        {
            ThrowOriginalExceptions(eventType, exceptions);
        }
    }

    //处理事件
    protected virtual async Task TriggerHandlersAsync(Type eventType,
    object eventData, List<Exception> exceptions, InboxConfig? inboxConfig = null)
    {
        await new SynchronizationContextRemover();

        //获取所有处理事件的工厂类集合
        foreach (var handlerFactories in GetHandlerFactories(eventType))
        {
            //触发所有的工厂
            foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
            {
                await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType,
                eventData, exceptions, inboxConfig);
            }
        }

        //Implements generic argument inheritance. See IEventDataWithInheritableGenericArgument
        if (eventType.GetTypeInfo().IsGenericType &&
            eventType.GetGenericArguments().Length == 1 &&
            typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
        {
            var genericArg = eventType.GetGenericArguments()[0];
            var baseArg = genericArg.GetTypeInfo().BaseType;
            if (baseArg != null)
            {
                var baseEventType = eventType.GetGenericTypeDefinition().
                MakeGenericType(baseArg);
                var constructorArgs =
                ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
                var baseEventData =
                Activator.CreateInstance(baseEventType, constructorArgs)!;
                await PublishToEventBusAsync(baseEventType, baseEventData);
            }
        }
    }

    protected void ThrowOriginalExceptions(Type eventType, List<Exception> exceptions)
    {
        if (exceptions.Count == 1)
        {
            exceptions[0].ReThrow();
        }

        throw new AggregateException(
            "More than one error has occurred while triggering the event: " + eventType,
            exceptions
        );
    }

    protected virtual void SubscribeHandlers(ITypeList<IEventHandler> handlers)
    {
        foreach (var handler in handlers)
        {
            var interfaces = handler.GetInterfaces();
            foreach (var @interface in interfaces)
            {
                if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
                {
                    continue;
                }

                var genericArgs = @interface.GetGenericArguments();
                if (genericArgs.Length == 1)
                {
                    Subscribe(genericArgs[0],
                    new IocEventHandlerFactory(ServiceScopeFactory, handler));
                }
            }
        }
    }

    protected abstract IEnumerable<EventTypeWithEventHandlerFactories>
     GetHandlerFactories(Type eventType);

    //
    protected virtual async Task TriggerHandlerAsync(
      IEventHandlerFactory asyncHandlerFactory, Type eventType,
        object eventData, List<Exception> exceptions, InboxConfig? inboxConfig = null)
    {
        //处理完成在 dispose 方法里清除字典
        using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
        {
            try
            {
                var handlerType = eventHandlerWrapper.EventHandler.GetType();

                if (inboxConfig?.HandlerSelector != null &&
                    !inboxConfig.HandlerSelector(handlerType))
                {
                    return;
                }

                using (CurrentTenant.Change(GetEventDataTenantId(eventData)))
                {
                    await InvokeEventHandlerAsync(
                      eventHandlerWrapper.EventHandler, eventData, eventType);
                }
            }
            catch (TargetInvocationException ex)
            {
                exceptions.Add(ex.InnerException!);
            }
            catch (Exception ex)
            {
                exceptions.Add(ex);
            }
        }
    }

    protected virtual Task InvokeEventHandlerAsync(
      IEventHandler eventHandler, object eventData, Type eventType)
    {
        return EventHandlerInvoker.InvokeAsync(eventHandler, eventData, eventType);
    }

    protected virtual Guid? GetEventDataTenantId(object eventData)
    {
        return eventData switch
        {
            IMultiTenant multiTenantEventData =>
            multiTenantEventData.TenantId,
            IEventDataMayHaveTenantId eventDataMayHaveTenantId
            when eventDataMayHaveTenantId.IsMultiTenant(out var tenantId) => tenantId,
            _ => CurrentTenant.Id
        };
    }

    protected class EventTypeWithEventHandlerFactories
    {
        public Type EventType { get; }

        public List<IEventHandlerFactory> EventHandlerFactories { get; }

        public EventTypeWithEventHandlerFactories(Type eventType,
         List<IEventHandlerFactory> eventHandlerFactories)
        {
            EventType = eventType;
            EventHandlerFactories = eventHandlerFactories;
        }
    }

    // Reference from
    // https://blogs.msdn.microsoft.com/benwilli/2017/
    // 02/09/an-alternative-to-configureawaitfalse-everywhere/
    protected struct SynchronizationContextRemover : INotifyCompletion
    {
        public bool IsCompleted {
            get { return SynchronizationContext.Current == null; }
        }

        public void OnCompleted(Action continuation)
        {
            var prevContext = SynchronizationContext.Current;
            try
            {
                SynchronizationContext.SetSynchronizationContext(null);
                continuation();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(prevContext);
            }
        }

        public SynchronizationContextRemover GetAwaiter()
        {
            return this;
        }

        public void GetResult()
        {
        }
    }
}

EventHandlerInvoker 处理事件的最底层

public delegate Task EventHandlerMethodExecutorAsync(IEventHandler target, object parameter);

public interface IEventHandlerMethodExecutor
{
    EventHandlerMethodExecutorAsync ExecutorAsync { get; }
}

public class LocalEventHandlerMethodExecutor<TEvent> : IEventHandlerMethodExecutor
    where TEvent : class
{
    public  EventHandlerMethodExecutorAsync ExecutorAsync => (target, parameter) =>
     target.As<ILocalEventHandler<TEvent>>().HandleEventAsync(parameter.As<TEvent>());

    //执行委托
    public Task ExecuteAsync(IEventHandler target, TEvent parameters)
    {
        return ExecutorAsync(target, parameters);
    }
}

public class DistributedEventHandlerMethodExecutor<TEvent> : IEventHandlerMethodExecutor
    where TEvent : class
{
  public EventHandlerMethodExecutorAsync ExecutorAsync => (target, parameter) =>
  target.As<IDistributedEventHandler<TEvent>>().HandleEventAsync(parameter.As<TEvent>());

  public Task ExecuteAsync(IEventHandler target, TEvent parameters)
  {
      return ExecutorAsync(target, parameters);
  }
}

EventHandlerInvoker 作为单例

public class EventHandlerInvoker : IEventHandlerInvoker, ISingletonDependency
{
    //因为是反射生成处理执行器,这里使用缓存
    private readonly ConcurrentDictionary<string, EventHandlerInvokerCacheItem> _cache;

    public EventHandlerInvoker()
    {
        _cache = new ConcurrentDictionary<string, EventHandlerInvokerCacheItem>();
    }

    public async Task InvokeAsync(IEventHandler eventHandler, object eventData,
     Type eventType)
    {
        var cacheItem = _cache.GetOrAdd(
          $"{eventHandler.GetType().FullName}-{eventType.FullName}", _ =>
        {
            var item = new EventHandlerInvokerCacheItem();

            if (typeof(ILocalEventHandler<>).
            MakeGenericType(eventType).IsInstanceOfType(eventHandler))
            {
                item.Local = (IEventHandlerMethodExecutor?)Activator.
                CreateInstance(typeof(LocalEventHandlerMethodExecutor<>).
                MakeGenericType(eventType));
            }

            if (typeof(IDistributedEventHandler<>).
            MakeGenericType(eventType).IsInstanceOfType(eventHandler))
            {
                item.Distributed = (IEventHandlerMethodExecutor?)Activator.
                CreateInstance(typeof(DistributedEventHandlerMethodExecutor<>).
                MakeGenericType(eventType));
            }

            return item;
        });

        if (cacheItem.Local != null)
        {
            //执行本地事件处理器
            await cacheItem.Local.ExecutorAsync(eventHandler, eventData);
        }

        if (cacheItem.Distributed != null)
        {
            //执行分布式事件处理器
            await cacheItem.Distributed.ExecutorAsync(eventHandler, eventData);
        }

        if (cacheItem.Local == null && cacheItem.Distributed == null)
        {
            throw new AbpException("The object instance is not an event handler.
            Object type: " + eventHandler.GetType().AssemblyQualifiedName);
        }
    }
}

ILocalEventBus 本地事件总线

public interface ILocalEventBus : IEventBus
{
    IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler)
        where TEvent : class;
}

LocalEventBus 的实现

[ExposeServices(typeof(ILocalEventBus), typeof(LocalEventBus))]
public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{

    public ILogger<LocalEventBus> Logger { get; set; }

    protected AbpLocalEventBusOptions Options { get; }

    protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories
    { get; }

    public LocalEventBus(
        IOptions<AbpLocalEventBusOptions> options,
        IServiceScopeFactory serviceScopeFactory,
        ICurrentTenant currentTenant,
        IUnitOfWorkManager unitOfWorkManager,
        IEventHandlerInvoker eventHandlerInvoker)
        : base(serviceScopeFactory, currentTenant, unitOfWorkManager, eventHandlerInvoker)
    {
        Options = options.Value;
        Logger = NullLogger<LocalEventBus>.Instance;

        HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
        SubscribeHandlers(Options.Handlers);
    }

    public virtual IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler)
    where TEvent : class
    {
        return Subscribe(typeof(TEvent), handler);
    }

    //
    public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
    {
        //事件总线维护并发字典,保存  eventType 和一些例的 HandlerFactory 的字典
        //本地字典里获取  eventType 相应的 HandlerFactory 集合
        GetOrCreateHandlerFactories(eventType)
            .Locking(factories =>
                {
                    // factory 是要使用的具体工厂在本地事件总线维护的工厂里是否维护过,
                    //没有维护过就加入
                    if (!factory.IsInFactories(factories))
                    {
                        factories.Add(factory);
                    }
                }
            );

        //取消订阅
        return new EventHandlerFactoryUnregistrar(this, eventType, factory);
    }

    public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
    {
        Check.NotNull(action, nameof(action));

        GetOrCreateHandlerFactories(typeof(TEvent))
            .Locking(factories =>
            {
                factories.RemoveAll(
                    factory =>
                    {
                        var singleInstanceFactory = factory as
                        SingleInstanceHandlerFactory;
                        if (singleInstanceFactory == null)
                        {
                            return false;
                        }

                        var actionHandler = singleInstanceFactory.HandlerInstance
                        as ActionEventHandler<TEvent>;
                        if (actionHandler == null)
                        {
                            return false;
                        }

                        return actionHandler.Action == action;
                    });
            });
    }


    //多指定的事件,移除指定的唯一事件处理器
    public override void Unsubscribe(Type eventType, IEventHandler handler)
    {
        //查询指定事件类型的所有处理器工厂
        GetOrCreateHandlerFactories(eventType)
            .Locking(factories =>
            {
                //异常指定的处理器工厂,这里因为指定了具体的处理器实例,所以移除的是
                // SingleInstanceHandlerFactory 工厂
                factories.RemoveAll(
                    factory =>
                        factory is SingleInstanceHandlerFactory &&
                        ((factory as SingleInstanceHandlerFactory)!).
                        HandlerInstance == handler
                );
            });
    }

    //直接异常指定的工厂
    public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
    {
        GetOrCreateHandlerFactories(eventType).Locking(factories =>
        factories.Remove(factory));
    }

    //取消所有的事件处理器
    public override void UnsubscribeAll(Type eventType)
    {
        GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
    }

    protected override async Task PublishToEventBusAsync(Type eventType, object eventData)
    {
        await PublishAsync(new LocalEventMessage(Guid.NewGuid(), eventData, eventType));
    }

    protected override void AddToUnitOfWork(IUnitOfWork unitOfWork,
    UnitOfWorkEventRecord eventRecord)
    {
        //给工作单元增加事件记录,
        unitOfWork.AddOrReplaceLocalEvent(eventRecord);
    }

    //发布事件
    public virtual async Task PublishAsync(LocalEventMessage localEventMessage)
    {
      await TriggerHandlersAsync(localEventMessage.EventType, localEventMessage.EventData);
    }

    protected override IEnumerable<EventTypeWithEventHandlerFactories>
     GetHandlerFactories(Type eventType)
    {
        var handlerFactoryList = new List<Tuple<IEventHandlerFactory, Type, int>>();
        foreach (var handlerFactory in HandlerFactories.Where(hf =>
        ShouldTriggerEventForHandler(eventType, hf.Key)))
        {
          foreach (var factory in handlerFactory.Value)
          {
            handlerFactoryList.Add(new Tuple<IEventHandlerFactory, Type, int>(
            factory,
            handlerFactory.Key,
            ReflectionHelper.GetAttributesOfMemberOrDeclaringType<
            LocalEventHandlerOrderAttribute>
            (factory.GetHandler().EventHandler.GetType()).FirstOrDefault()?.Order ?? 0));
          }
        }

        return handlerFactoryList.OrderBy(x => x.Item3).Select(x =>
         new EventTypeWithEventHandlerFactories(
          x.Item2, new List<IEventHandlerFactory> {x.Item1})).ToArray();
    }

    private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
    {
        return HandlerFactories.GetOrAdd(eventType, (type) =>
        new List<IEventHandlerFactory>());
    }

    private static bool ShouldTriggerEventForHandler(
      Type targetEventType, Type handlerEventType)
    {
        //Should trigger same type
        if (handlerEventType == targetEventType)
        {
            return true;
        }

        //Should trigger for inherited types
        if (handlerEventType.IsAssignableFrom(targetEventType))
        {
            return true;
        }

        return false;
    }

    // Internal for unit testing
    internal Func<Type, object, Task>? OnEventHandleInvoking { get; set; }

    // Internal for unit testing
    protected async override Task InvokeEventHandlerAsync(
      IEventHandler eventHandler, object eventData, Type eventType)
    {
        if (OnEventHandleInvoking != null && eventType !=
         typeof(DistributedEventSent) && eventType != typeof(DistributedEventReceived))
        {
            await OnEventHandleInvoking(eventType, eventData);
        }

        await base.InvokeEventHandlerAsync(eventHandler, eventData, eventType);
    }

    // Internal for unit testing
    internal Func<Type, object, Task>? OnPublishing { get; set; }

    // For unit testing
    public async override Task PublishAsync(
        Type eventType,
        object eventData,
        bool onUnitOfWorkComplete = true)
    {
        if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
        {
            AddToUnitOfWork(
                UnitOfWorkManager.Current,
                new UnitOfWorkEventRecord(
                  eventType, eventData, EventOrderGenerator.GetNext())
            );
            return;
        }

        // For unit testing
        if (OnPublishing != null && eventType !=
         typeof(DistributedEventSent) && eventType != typeof(DistributedEventReceived))
        {
            await OnPublishing(eventType, eventData);
        }

        await PublishToEventBusAsync(eventType, eventData);
    }
}

IDistributedEventBus 分布式事件总线

public interface IDistributedEventBus : IEventBus
{
    IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler)
        where TEvent : class;

    Task PublishAsync<TEvent>(
        TEvent eventData,
        bool onUnitOfWorkComplete = true,
        bool useOutbox = true)
        where TEvent : class;

    Task PublishAsync(
        Type eventType,
        object eventData,
        bool onUnitOfWorkComplete = true,
        bool useOutbox = true);
}
namespace Volo.Abp.EventBus.Distributed;

public abstract class DistributedEventBusBase :
EventBusBase, IDistributedEventBus, ISupportsEventBoxes
{
    protected IGuidGenerator GuidGenerator { get; }
    protected IClock Clock { get; }
    protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
    protected ILocalEventBus LocalEventBus { get; }
    protected ICorrelationIdProvider CorrelationIdProvider { get; }

    protected DistributedEventBusBase(
        IServiceScopeFactory serviceScopeFactory,
        ICurrentTenant currentTenant,
        IUnitOfWorkManager unitOfWorkManager,
        IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
        IGuidGenerator guidGenerator,
        IClock clock,
        IEventHandlerInvoker eventHandlerInvoker,
        ILocalEventBus localEventBus,
        ICorrelationIdProvider correlationIdProvider) : base(
        serviceScopeFactory,
        currentTenant,
        unitOfWorkManager,
        eventHandlerInvoker)
    {
        GuidGenerator = guidGenerator;
        Clock = clock;
        AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value;
        LocalEventBus = localEventBus;
        CorrelationIdProvider = correlationIdProvider;
    }

    public IDisposable Subscribe<TEvent>(
      IDistributedEventHandler<TEvent> handler) where TEvent : class
    {
        return Subscribe(typeof(TEvent), handler);
    }

    public override Task PublishAsync(
      Type eventType, object eventData, bool onUnitOfWorkComplete = true)
    {
        return PublishAsync(eventType, eventData, onUnitOfWorkComplete, useOutbox: true);
    }

    public Task PublishAsync<TEvent>(
        TEvent eventData,
        bool onUnitOfWorkComplete = true,
        bool useOutbox = true)
        where TEvent : class
    {
        return PublishAsync(typeof(TEvent), eventData, onUnitOfWorkComplete, useOutbox);
    }

    public async Task PublishAsync(
        Type eventType,
        object eventData,
        bool onUnitOfWorkComplete = true,
        bool useOutbox = true)
    {
        if (onUnitOfWorkComplete && UnitOfWorkManager.Current != null)
        {
            AddToUnitOfWork(
                UnitOfWorkManager.Current,
                new UnitOfWorkEventRecord(
                  eventType, eventData, EventOrderGenerator.GetNext(), useOutbox)
            );
            return;
        }

        if (useOutbox)
        {
            if (await AddToOutboxAsync(eventType, eventData))
            {
                return;
            }
        }

        await TriggerDistributedEventSentAsync(new DistributedEventSent()
        {
            Source = DistributedEventSource.Direct,
            EventName = EventNameAttribute.GetNameOrDefault(eventType),
            EventData = eventData
        });

        await PublishToEventBusAsync(eventType, eventData);
    }

    public abstract Task PublishFromOutboxAsync(
        OutgoingEventInfo outgoingEvent,
        OutboxConfig outboxConfig
    );

    public abstract Task PublishManyFromOutboxAsync(
        IEnumerable<OutgoingEventInfo> outgoingEvents,
        OutboxConfig outboxConfig
    );

    public abstract Task ProcessFromInboxAsync(
        IncomingEventInfo incomingEvent,
        InboxConfig inboxConfig);

    protected virtual async Task<bool> AddToOutboxAsync(Type eventType, object eventData)
    {
        var unitOfWork = UnitOfWorkManager.Current;
        if (unitOfWork == null)
        {
            return false;
        }

        foreach (var outboxConfig in AbpDistributedEventBusOptions.Outboxes.Values.
        OrderBy(x => x.Selector is null))
        {
            if (outboxConfig.Selector == null || outboxConfig.Selector(eventType))
            {
                var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.
                GetRequiredService(outboxConfig.ImplementationType);
                var eventName = EventNameAttribute.GetNameOrDefault(eventType);

                await OnAddToOutboxAsync(eventName, eventType, eventData);

                var outgoingEventInfo = new OutgoingEventInfo(
                    GuidGenerator.Create(),
                    eventName,
                    Serialize(eventData),
                    Clock.Now
                );
                outgoingEventInfo.SetCorrelationId(CorrelationIdProvider.Get()!);
                await eventOutbox.EnqueueAsync(outgoingEventInfo);
                return true;
            }
        }

        return false;
    }

    protected virtual Task OnAddToOutboxAsync(string eventName,
    Type eventType, object eventData)
    {
        return Task.CompletedTask;
    }

    protected async Task<bool> AddToInboxAsync(
        string? messageId,
        string eventName,
        Type eventType,
        object eventData,
        string? correlationId)
    {
        if (AbpDistributedEventBusOptions.Inboxes.Count <= 0)
        {
            return false;
        }

        using (var scope = ServiceScopeFactory.CreateScope())
        {
            foreach (var inboxConfig in AbpDistributedEventBusOptions.
            Inboxes.Values.OrderBy(x => x.EventSelector is null))
            {
                if (inboxConfig.EventSelector == null ||
                inboxConfig.EventSelector(eventType))
                {
                    var eventInbox =
                        (IEventInbox)scope.ServiceProvider.
                        GetRequiredService(inboxConfig.ImplementationType);

                    if (!messageId.IsNullOrEmpty())
                    {
                        if (await eventInbox.ExistsByMessageIdAsync(messageId!))
                        {
                            continue;
                        }
                    }

                    var incomingEventInfo = new IncomingEventInfo(
                        GuidGenerator.Create(),
                        messageId!,
                        eventName,
                        Serialize(eventData),
                        Clock.Now
                    );
                    incomingEventInfo.SetCorrelationId(correlationId!);
                    await eventInbox.EnqueueAsync(incomingEventInfo);
                }
            }
        }

        return true;
    }

    protected abstract byte[] Serialize(object eventData);

    protected virtual async Task TriggerHandlersDirectAsync(
      Type eventType, object eventData)
    {
        await TriggerDistributedEventReceivedAsync(new DistributedEventReceived
        {
            Source = DistributedEventSource.Direct,
            EventName = EventNameAttribute.GetNameOrDefault(eventType),
            EventData = eventData
        });

        await TriggerHandlersAsync(eventType, eventData);
    }

    protected virtual async Task TriggerHandlersFromInboxAsync(
      Type eventType, object eventData,
       List<Exception> exceptions, InboxConfig? inboxConfig = null)
    {
        await TriggerDistributedEventReceivedAsync(new DistributedEventReceived
        {
            Source = DistributedEventSource.Inbox,
            EventName = EventNameAttribute.GetNameOrDefault(eventType),
            EventData = eventData
        });

        await TriggerHandlersAsync(eventType, eventData, exceptions, inboxConfig);
    }

    public virtual async Task TriggerDistributedEventSentAsync(
      DistributedEventSent distributedEvent)
    {
        try
        {
            await LocalEventBus.PublishAsync(distributedEvent);
        }
        catch (Exception _)
        {
            // ignored
        }
    }

    public virtual async Task TriggerDistributedEventReceivedAsync(
      DistributedEventReceived distributedEvent)
    {
        try
        {
            await LocalEventBus.PublishAsync(distributedEvent);
        }
        catch (Exception _)
        {
            // ignored
        }
    }
}

分布式事件总线基础概念支撑

OutgoingEventRecord 发件箱持有的数据

volo.Abp.EntityFrameworkCore 模块定义的聚合根

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;

public class OutgoingEventRecord :
    BasicAggregateRoot<Guid>,
    IHasExtraProperties,
    IHasCreationTime
{
    public static int MaxEventNameLength { get; set; } = 256;

    public ExtraPropertyDictionary ExtraProperties { get; private set; }

    //事件名称
    public string EventName { get; private set; } = default!;

    //事件包含的数据是可序列化,可网络传递的数据
    public byte[] EventData { get; private set; } = default!;

    public DateTime CreationTime { get; private set; }

    protected OutgoingEventRecord()
    {
        ExtraProperties = new ExtraPropertyDictionary();
        this.SetDefaultsForExtraProperties();
    }

    public OutgoingEventRecord(
        OutgoingEventInfo eventInfo)
        : base(eventInfo.Id)
    {
        EventName = eventInfo.EventName;
        EventData = eventInfo.EventData;
        CreationTime = eventInfo.CreationTime;

        ExtraProperties = new ExtraPropertyDictionary();
        this.SetDefaultsForExtraProperties();
    }

    public OutgoingEventInfo ToOutgoingEventInfo()
    {
        var info = new OutgoingEventInfo(
            Id,
            EventName,
            EventData,
            CreationTime
        );

        foreach (var property in ExtraProperties)
        {
            info.SetProperty(property.Key, property.Value);
        }

        return info;
    }
}

IncomingEventRecord 收件箱持有的数据

volo.Abp.EntityFrameworkCore 模块定义的聚合根

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;

public class IncomingEventRecord :
    BasicAggregateRoot<Guid>,
    IHasExtraProperties,
    IHasCreationTime
{
    public static int MaxEventNameLength { get; set; } = 256;

    public ExtraPropertyDictionary ExtraProperties { get; private set; }

    //消息 ID   rabbit mq 需要
    public string MessageId { get; private set; } = default!;

    public string EventName { get; private set; } = default!;

    public byte[] EventData { get; private set; } = default!;

    public DateTime CreationTime { get; private set; }

    //是否处理
    public bool Processed { get; set; }
    //处理时间
    public DateTime? ProcessedTime { get; set; }

    protected IncomingEventRecord()
    {
        ExtraProperties = new ExtraPropertyDictionary();
        this.SetDefaultsForExtraProperties();
    }

    public IncomingEventRecord(
        IncomingEventInfo eventInfo)
        : base(eventInfo.Id)
    {
        MessageId = eventInfo.MessageId;
        EventName = eventInfo.EventName;
        EventData = eventInfo.EventData;
        CreationTime = eventInfo.CreationTime;

        ExtraProperties = new ExtraPropertyDictionary();
        this.SetDefaultsForExtraProperties();
    }

    public IncomingEventInfo ToIncomingEventInfo()
    {
        var info = new IncomingEventInfo(
            Id,
            MessageId,
            EventName,
            EventData,
            CreationTime
        );

        foreach (var property in ExtraProperties)
        {
            info.SetProperty(property.Key, property.Value);
        }

        return info;
    }

    public void MarkAsProcessed(DateTime processedTime)
    {
        Processed = true;
        ProcessedTime = processedTime;
    }
}

IHasEventInbox IHasEventOutbox 数据库上下文

数据库上下文持有的收发件箱的数据

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;

public interface IHasEventInbox : IEfCoreDbContext
{
    DbSet<IncomingEventRecord> IncomingEvents { get; set; }
}

public interface IHasEventOutbox : IEfCoreDbContext
{
    DbSet<OutgoingEventRecord> OutgoingEvents { get; set; }
}

收发件箱提供的服务

namespace Volo.Abp.EventBus.Distributed;

public interface IEventOutbox
{
    //加入发件箱
    Task EnqueueAsync(OutgoingEventInfo outgoingEvent);

    //获取发件箱内的 OutgoingEvent
    Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int maxCount,
     CancellationToken cancellationToken = default);

    //删除一个发件信息
    Task DeleteAsync(Guid id);

    //删除多个发件信息
    Task DeleteManyAsync(IEnumerable<Guid> ids);
}

public interface IEventInbox
{
    //收件箱加入信息
    Task EnqueueAsync(IncomingEventInfo incomingEvent);

    //获取所有收件信息
    Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int maxCount,
    CancellationToken cancellationToken = default);

    //标记事件已经处理
    Task MarkAsProcessedAsync(Guid id);

    //判断消息是否存在收件箱
    Task<bool> ExistsByMessageIdAsync(string messageId);

    //删除所有已经处理的信息
    Task DeleteOldEventsAsync();
}

服务的提供者通过 ef core,进行提供

public interface IDbContextEventOutbox<TDbContext> : IEventOutbox
    where TDbContext : IHasEventOutbox
{

}

public interface IDbContextEventInbox<TDbContext> : IEventInbox
    where TDbContext : IHasEventInbox
{

}

IDbContextEventInbox 的实现 收件箱处理后的不会立即删除会有个缓存时间

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;

public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
    where TDbContext : IHasEventInbox
{
    protected IDbContextProvider<TDbContext> DbContextProvider { get; }
    protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
    protected IClock Clock { get; }

    // IDbContextProvider<TDbContext> 获取上下文提供器
    public DbContextEventInbox(
        IDbContextProvider<TDbContext> dbContextProvider,
        IClock clock,
        IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
    {
        DbContextProvider = dbContextProvider;
        Clock = clock;
        EventBusBoxesOptions = eventBusBoxesOptions.Value;
    }

    [UnitOfWork]
    public virtual async Task EnqueueAsync(IncomingEventInfo incomingEvent)
    {
        //因为不是默认的支持工作单元的服务,故这里手动加 UnitOfWork,如果
        //使用了工作单元中间件,会使用工作单元中间件的工作单元处理
        //如果没有工作单元中间件,就新开工作单元处理
        var dbContext = await DbContextProvider.GetDbContextAsync();
        dbContext.IncomingEvents.Add(new IncomingEventRecord(incomingEvent));
    }

    [UnitOfWork]
    public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(
      int maxCount, CancellationToken cancellationToken = default)
    {
        var dbContext = await DbContextProvider.GetDbContextAsync();

        var outgoingEventRecords = await dbContext
            .IncomingEvents
            .AsNoTracking()
            .Where(x => !x.Processed)
            .OrderBy(x => x.CreationTime)
            .Take(maxCount)
            .ToListAsync(cancellationToken: cancellationToken);

        return outgoingEventRecords
            .Select(x => x.ToIncomingEventInfo())
            .ToList();
    }

    //标记为已经处理
    [UnitOfWork]
    public virtual async Task MarkAsProcessedAsync(Guid id)
    {
        var dbContext = await DbContextProvider.GetDbContextAsync();
        await dbContext.IncomingEvents.Where(x => x.Id == id).ExecuteUpdateAsync(x =>
            x.SetProperty(p => p.Processed, _ => true).SetProperty(p =>
             p.ProcessedTime, _ => Clock.Now));
    }

    [UnitOfWork]
    public virtual async Task<bool> ExistsByMessageIdAsync(string messageId)
    {
        var dbContext = await DbContextProvider.GetDbContextAsync();
        return await dbContext.IncomingEvents.AnyAsync(x => x.MessageId == messageId);
    }

     //删除已经处理的,事件生成时间超出范围的收件箱记录
    [UnitOfWork]
    public virtual async Task DeleteOldEventsAsync()
    {
        var dbContext = await DbContextProvider.GetDbContextAsync();
        var timeToKeepEvents = Clock.Now -
        EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;
        await dbContext.IncomingEvents
            .Where(x => x.Processed && x.CreationTime < timeToKeepEvents)
            .ExecuteDeleteAsync();
    }
}

IDbContextEventOutbox 的实现

public class DbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
    where TDbContext : IHasEventOutbox
{
    protected IDbContextProvider<TDbContext> DbContextProvider { get; }

    public DbContextEventOutbox(
        IDbContextProvider<TDbContext> dbContextProvider)
    {
        DbContextProvider = dbContextProvider;
    }

    [UnitOfWork]
    public virtual async Task EnqueueAsync(OutgoingEventInfo outgoingEvent)
    {
        var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync();
        dbContext.OutgoingEvents.Add(new OutgoingEventRecord(outgoingEvent));
    }

    //获取所有待处理的发件箱记录,因为方法以Get 开头不会加入事务
    [UnitOfWork]
    public virtual async Task<List<OutgoingEventInfo>>
    GetWaitingEventsAsync(int maxCount, CancellationToken cancellationToken = default)
    {
        var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync();

        var outgoingEventRecords = await dbContext
            .OutgoingEvents
            .AsNoTracking()
            .OrderBy(x => x.CreationTime)
            .Take(maxCount)
            .ToListAsync(cancellationToken: cancellationToken);

        return outgoingEventRecords
            .Select(x => x.ToOutgoingEventInfo())
            .ToList();
    }

    //删除指定的
    [UnitOfWork]
    public virtual async Task DeleteAsync(Guid id)
    {
        var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync();
        await dbContext.OutgoingEvents.Where(x => x.Id == id).ExecuteDeleteAsync();
    }

    [UnitOfWork]
    public virtual async Task DeleteManyAsync(IEnumerable<Guid> ids)
    {
        var dbContext = (IHasEventOutbox)await DbContextProvider.GetDbContextAsync();
        await dbContext.OutgoingEvents.Where(x => ids.Contains(x.Id)).ExecuteDeleteAsync();
    }
}

收发件箱的配置

public class InboxConfig
{
    [NotNull]
    public string Name { get; }

    [NotNull]
    public string DatabaseName {
        get => _databaseName;
        set => _databaseName = Check.NotNullOrWhiteSpace(value, nameof(DatabaseName));
    }

    //数据库连接字符串
    [NotNull] private string _databaseName = default!;

    //收件箱提供服务的类型
    public Type ImplementationType { get; set; } = default!;

    //进入收件箱的过滤器,收件箱不是所有事件都收集
    public Func<Type, bool>? EventSelector { get; set; }

    //收件箱 事件处理器过滤器,收件箱里可以指定事件处理器是否可以处理事件
    public Func<Type, bool>? HandlerSelector { get; set; }

    /// <summary>
    /// Used to enable/disable processing incoming events.
    /// Default: true.
    /// </summary>
    public bool IsProcessingEnabled { get; set; } = true;

    public InboxConfig([NotNull] string name)
    {
        Name = Check.NotNullOrWhiteSpace(name, nameof(name));
    }
}
public class OutboxConfig
{
    [NotNull]
    public string Name { get; }

    [NotNull]
    public string DatabaseName {
        get => _databaseName;
        set => _databaseName = Check.NotNullOrWhiteSpace(value, nameof(DatabaseName));
    }
     //发件箱数据库连接字符串
    [NotNull] private string _databaseName = default!;

    //发件箱处理服务类
    public Type ImplementationType { get; set; } = default!;

    //选择进入发件箱的事件类型
    public Func<Type, bool>? Selector { get; set; }

    /// <summary>
    /// Used to enable/disable sending events from outbox to the message broker.
    /// Default: true.
    /// </summary>
    public bool IsSendingEnabled { get; set; } = true;

    public OutboxConfig([NotNull] string name)
    {
        Name = Check.NotNullOrWhiteSpace(name, nameof(name));
    }
}
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents;

public static class EfCoreInboxConfigExtensions
{
    public static void UseDbContext<TDbContext>(this InboxConfig outboxConfig)
        where TDbContext : IHasEventInbox
    {
        //指定服务实现类 DbContextEventInbox<>
        outboxConfig.ImplementationType = typeof(IDbContextEventInbox<TDbContext>);
        //通过 db context 上下文 获取的 Attribute 获取链接字符串
        outboxConfig.DatabaseName = ConnectionStringNameAttribute.
        GetConnStringName<TDbContext>();
    }
}

public static class EfCoreOutboxConfigExtensions
{
    public static void UseDbContext<TDbContext>(this OutboxConfig outboxConfig)
        where TDbContext : IHasEventOutbox
    {
        outboxConfig.ImplementationType = typeof(IDbContextEventOutbox<TDbContext>);
        outboxConfig.DatabaseName = ConnectionStringNameAttribute.
        GetConnStringName<TDbContext>();
    }
}

DbContextEventOutbox , DbContextEventInbox 注入的地方

namespace Volo.Abp.EntityFrameworkCore;

[DependsOn(typeof(AbpDddDomainModule))]
public class AbpEntityFrameworkCoreModule : AbpModule
{
    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        Configure<AbpDbContextOptions>(options =>
        {
            options.PreConfigure(abpDbContextConfigurationContext =>
            {
                abpDbContextConfigurationContext.DbContextOptions
                    .ConfigureWarnings(warnings =>
                    {
                        warnings.Ignore(CoreEventId.LazyLoadOnDisposedContextWarning);
                    });
            });
        });

        context.Services.TryAddTransient(typeof(IDbContextProvider<>),
        typeof(UnitOfWorkDbContextProvider<>));
        context.Services.AddTransient(typeof(IDbContextEventOutbox<>),
         typeof(DbContextEventOutbox<>));
        context.Services.AddTransient(typeof(IDbContextEventInbox<>),
        typeof(DbContextEventInbox<>));
    }
}

IInboxProcessor 处理收件箱

public interface IInboxProcessor
{
    Task StartAsync(InboxConfig inboxConfig, CancellationToken
    cancellationToken = default);

    Task StopAsync(CancellationToken cancellationToken = default);
}

namespace Volo.Abp.EventBus.Distributed;

public class InboxProcessor : IInboxProcessor, ITransientDependency
{
  protected IServiceProvider ServiceProvider { get; }
  protected AbpAsyncTimer Timer { get; }
  protected IDistributedEventBus DistributedEventBus { get; }
  protected IAbpDistributedLock DistributedLock { get; }
  protected IUnitOfWorkManager UnitOfWorkManager { get; }
  protected IClock Clock { get; }
  protected IEventInbox Inbox { get; private set; } = default!;
  protected InboxConfig InboxConfig { get; private set; } = default!;
  protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }

  protected DateTime? LastCleanTime { get; set; }

  protected string DistributedLockName { get; private set; } = default!;
  public ILogger<InboxProcessor> Logger { get; set; }
  protected CancellationTokenSource StoppingTokenSource { get; }
  protected CancellationToken StoppingToken { get; }

  //
  public InboxProcessor(
      IServiceProvider serviceProvider,
      AbpAsyncTimer timer,
      IDistributedEventBus distributedEventBus,
      IAbpDistributedLock distributedLock,
      IUnitOfWorkManager unitOfWorkManager,
      IClock clock,
      IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
  {
      ServiceProvider = serviceProvider;
      Timer = timer;
      DistributedEventBus = distributedEventBus;
      DistributedLock = distributedLock;
      UnitOfWorkManager = unitOfWorkManager;
      Clock = clock;
      EventBusBoxesOptions = eventBusBoxesOptions.Value;
      Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.
      TotalMilliseconds);
      Timer.Elapsed += TimerOnElapsed;
      Logger = NullLogger<InboxProcessor>.Instance;
      StoppingTokenSource = new CancellationTokenSource();
      StoppingToken = StoppingTokenSource.Token;
  }

  private async Task TimerOnElapsed(AbpAsyncTimer arg)
  {
      //定时发送消息及清理收件箱
      await RunAsync();
  }

  //启动定时器
  public Task StartAsync(InboxConfig inboxConfig,
  CancellationToken cancellationToken = default)
  {
      InboxConfig = inboxConfig;
      Inbox = (IEventInbox)ServiceProvider.GetRequiredService(
        inboxConfig.ImplementationType);
      DistributedLockName = $"AbpInbox_{InboxConfig.DatabaseName}";
      Timer.Start(cancellationToken);
      return Task.CompletedTask;
  }

  public Task StopAsync(CancellationToken cancellationToken = default)
  {
      StoppingTokenSource.Cancel();
      Timer.Stop(cancellationToken);
      StoppingTokenSource.Dispose();
      return Task.CompletedTask;
  }

  protected virtual async Task RunAsync()
  {
      if (StoppingToken.IsCancellationRequested)
      {
          return;
      }

      //
      await using (var handle = await DistributedLock.
      TryAcquireAsync(DistributedLockName, cancellationToken: StoppingToken))
      {
        if (handle != null)
        {
          //删除已经处理过的
          await DeleteOldEventsAsync();

          while (true)
          {
            //处理完退出,并释放锁
            var waitingEvents = await Inbox.
            GetWaitingEventsAsync(
              EventBusBoxesOptions.InboxWaitingEventMaxCount, StoppingToken);
            if (waitingEvents.Count <= 0)
            {
                break;
            }

            Logger.LogInformation($"Found {waitingEvents.Count} events in the inbox.");

            foreach (var waitingEvent in waitingEvents)
            {
                //工作单元进行处理 手动开启
                using (var uow = UnitOfWorkManager.Begin(
                  isTransactional: true, requiresNew: true))
                {
                    //发送
                    await DistributedEventBus
                        .AsSupportsEventBoxes()
                        .ProcessFromInboxAsync(waitingEvent, InboxConfig);

                    await Inbox.MarkAsProcessedAsync(waitingEvent.Id);

                    await uow.CompleteAsync(StoppingToken);
                }

                Logger.LogInformation($"Processed the incoming event with id =
                {waitingEvent.Id:N}");
            }
          }
          }
          else
          {
            //
            Logger.LogDebug("Could not obtain the distributed lock: " +
            DistributedLockName);
            try
            {   延迟在此获取锁
                await Task.Delay(EventBusBoxesOptions.DistributedLockWaitDuration,
                 StoppingToken);
            }
            catch (TaskCanceledException) { }
          }
      }
  }

  protected virtual async Task DeleteOldEventsAsync()
  {
      if (LastCleanTime != null && LastCleanTime + EventBusBoxesOptions.
      CleanOldEventTimeIntervalSpan > Clock.Now)
      {
          return;
      }

      await Inbox.DeleteOldEventsAsync();

      LastCleanTime = Clock.Now;
  }
}

IOutboxSender

public interface IOutboxSender
{
    Task StartAsync(OutboxConfig outboxConfig,
     CancellationToken cancellationToken = default);

    Task StopAsync(CancellationToken cancellationToken = default);
}
public class OutboxSender : IOutboxSender, ITransientDependency
{
    protected IServiceProvider ServiceProvider { get; }
    protected AbpAsyncTimer Timer { get; }
    protected IDistributedEventBus DistributedEventBus { get; }
    protected IAbpDistributedLock DistributedLock { get; }
    protected IEventOutbox Outbox { get; private set; } = default!;
    protected OutboxConfig OutboxConfig { get; private set; } = default!;
    protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
    protected string DistributedLockName { get; private set; } = default!;
    public ILogger<OutboxSender> Logger { get; set; }

    protected CancellationTokenSource StoppingTokenSource { get; }
    protected CancellationToken StoppingToken { get; }

    public OutboxSender(
        IServiceProvider serviceProvider,
        AbpAsyncTimer timer,
        IDistributedEventBus distributedEventBus,
        IAbpDistributedLock distributedLock,
       IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
    {
        ServiceProvider = serviceProvider;
        DistributedEventBus = distributedEventBus;
        DistributedLock = distributedLock;
        EventBusBoxesOptions = eventBusBoxesOptions.Value;
        Timer = timer;
        Timer.Period = Convert.ToInt32(EventBusBoxesOptions.PeriodTimeSpan.
        TotalMilliseconds);
        Timer.Elapsed += TimerOnElapsed;
        Logger = NullLogger<OutboxSender>.Instance;
        StoppingTokenSource = new CancellationTokenSource();
        StoppingToken = StoppingTokenSource.Token;
    }

    public virtual Task StartAsync(OutboxConfig outboxConfig,
     CancellationToken cancellationToken = default)
    {
        OutboxConfig = outboxConfig;
        Outbox = (IEventOutbox)ServiceProvider.GetRequiredService(
          outboxConfig.ImplementationType);
        DistributedLockName = $"AbpOutbox_{OutboxConfig.DatabaseName}";
        Timer.Start(cancellationToken);
        return Task.CompletedTask;
    }

    public virtual Task StopAsync(CancellationToken cancellationToken = default)
    {
        StoppingTokenSource.Cancel();
        Timer.Stop(cancellationToken);
        StoppingTokenSource.Dispose();
        return Task.CompletedTask;
    }

    private async Task TimerOnElapsed(AbpAsyncTimer arg)
    {
        await RunAsync();
    }

    protected virtual async Task RunAsync()
    {
        await using (var handle = await DistributedLock.TryAcquireAsync(
          DistributedLockName, cancellationToken: StoppingToken))
        {
            if (handle != null)
            {
                while (true)
                {
                    var waitingEvents = await Outbox.GetWaitingEventsAsync(
                      EventBusBoxesOptions.OutboxWaitingEventMaxCount, StoppingToken);
                    if (waitingEvents.Count <= 0)
                    {
                        break;
                    }

                    Logger.LogInformation($"Found {waitingEvents.Count}
                     events in the outbox.");

                    if (EventBusBoxesOptions.BatchPublishOutboxEvents)
                    {
                        await PublishOutgoingMessagesInBatchAsync(waitingEvents);
                    }
                    else
                    {
                        await PublishOutgoingMessagesAsync(waitingEvents);
                    }
                }
            }
            else
            {
                Logger.LogDebug("Could not obtain the distributed lock: " +
                DistributedLockName);
                try
                {
                    await Task.Delay(EventBusBoxesOptions.DistributedLockWaitDuration,
                    StoppingToken);
                }
                catch (TaskCanceledException) { }
            }
        }
    }

    protected virtual async Task PublishOutgoingMessagesAsync(
      List<OutgoingEventInfo> waitingEvents)
    {
        foreach (var waitingEvent in waitingEvents)
        {
            await DistributedEventBus
                .AsSupportsEventBoxes()
                .PublishFromOutboxAsync(
                    waitingEvent,
                    OutboxConfig
                );

            await Outbox.DeleteAsync(waitingEvent.Id);

            Logger.LogInformation($"Sent the event to the message broker with id =
             {waitingEvent.Id:N}");
        }
    }

    protected virtual async Task PublishOutgoingMessagesInBatchAsync(
      List<OutgoingEventInfo> waitingEvents)
    {
        await DistributedEventBus
            .AsSupportsEventBoxes()
            .PublishManyFromOutboxAsync(waitingEvents, OutboxConfig);

        await Outbox.DeleteManyAsync(waitingEvents.Select(x => x.Id).ToArray());

        Logger.LogInformation($"Sent {waitingEvents.Count} events to message broker");
    }
}
👍🎉🎊