ABP AbpThreadingModule
ABP AbpThreadingModule
2023/6/1
➡️

AbpThreadingModule

public class AbpThreadingModule : AbpModule
{
    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        context.Services.AddSingleton<ICancellationTokenProvider>(
          NullCancellationTokenProvider.Instance);
        context.Services.AddSingleton(typeof(IAmbientScopeProvider<>),
        typeof(AmbientDataContextAmbientScopeProvider<>));
    }
}

IAmbientDataContext , AmbientDataContextAmbientScopeProvider

多线程的环境上下文,AsyncLocal 可以在 await 里使用,环境上下文给初始数据,每个 scope 里可 以自己改写

public interface IAmbientDataContext
{
    void SetData(string key, object? value);

    object? GetData(string key);
}

public class AsyncLocalAmbientDataContext : IAmbientDataContext, ISingletonDependency
{
    //并发字典加 在 await 里可以传递的的上下文数据
    private static readonly ConcurrentDictionary<string, AsyncLocal<object?>>
     AsyncLocalDictionary = new ConcurrentDictionary<string, AsyncLocal<object?>>();

    public void SetData(string key, object? value)
    {
        var asyncLocal = AsyncLocalDictionary.GetOrAdd(key, (k) => new AsyncLocal<object?>());
        asyncLocal.Value = value;
    }

    public object? GetData(string key)
    {
        var asyncLocal = AsyncLocalDictionary.GetOrAdd(key, (k) => new AsyncLocal<object?>());
        return asyncLocal.Value;
    }
}

环境上下文内 开启 scope ,scope 内可以层层形成嵌套, 通过 IAmbientDataContext 保存 key 和 id 在 ScopeDictionary 里通过 id 找到 value


public interface IAmbientScopeProvider<T>
{
    //在 scope 范围内, 给定 key  获取值
    T? GetValue(string contextKey);

    // 开启一个 scope 把 key 和 value 保存进去
    IDisposable BeginScope(string contextKey, T value);
}

public class AmbientDataContextAmbientScopeProvider<T> : IAmbientScopeProvider<T>
{
    public ILogger<AmbientDataContextAmbientScopeProvider<T>> Logger { get; set; }

    //为了在上下文和线程里传递数据,使用字典保存环境数据
    private static readonly ConcurrentDictionary<string, ScopeItem>
     ScopeDictionary = new ConcurrentDictionary<string, ScopeItem>();

    private readonly IAmbientDataContext _dataContext;

    public AmbientDataContextAmbientScopeProvider([NotNull] IAmbientDataContext dataContext)
    {
        Check.NotNull(dataContext, nameof(dataContext));

        _dataContext = dataContext;

        Logger = NullLogger<AmbientDataContextAmbientScopeProvider<T>>.Instance;
    }

    public T? GetValue(string contextKey)
    {
        var item = GetCurrentItem(contextKey);
        if (item == null)
        {
            return default;
        }

        return item.Value;
    }


    public IDisposable BeginScope(string contextKey, T value)
    {
        var item = new ScopeItem(value, GetCurrentItem(contextKey));

        if (!ScopeDictionary.TryAdd(item.Id, item))
        {
            throw new AbpException(
              "Can not add item! ScopeDictionary.TryAdd returns false!");
        }

        _dataContext.SetData(contextKey, item.Id);

        return new DisposeAction<ValueTuple<ConcurrentDictionary<string,
        ScopeItem>, ScopeItem, IAmbientDataContext, string>>(static (state) =>
        {
            var (scopeDictionary, item,  dataContext, contextKey) = state;

            scopeDictionary.TryRemove(item.Id, out item);

            if (item == null)
            {
                return;
            }

            if (item.Outer == null)
            {
                dataContext.SetData(contextKey, null);
                return;
            }

            dataContext.SetData(contextKey, item.Outer.Id);

        }, (ScopeDictionary, item, _dataContext, contextKey));
    }

    private ScopeItem? GetCurrentItem(string contextKey)
    {
        //现在环境上下文里找,没有在 scope 链的上级找,一直找到最顶层,找不到返回 null
        return _dataContext.GetData(contextKey) is string objKey ?
         ScopeDictionary.GetOrDefault(objKey) : null;
    }

    private class ScopeItem
    {
        public string Id { get; }

        public ScopeItem? Outer { get; }

        public T Value { get; }

        public ScopeItem(T value, ScopeItem? outer = null)
        {
            Id = Guid.NewGuid().ToString();

            Value = value;
            Outer = outer;
        }
    }
}

演示 demo 结果

scope1 key1: value1 scope1 key2: task1 scope1 key1: value1 task1 scope1 key2: scope2 key1: value2 scope2 key2: task2 scope2 key1: value2 task2 scope2 key2: scope3 key1: value2 scope3 key2: value3 task3 scope3 key1: value2 task3 scope3 key2: value3 value1

[HttpGet("get3")]
public async Task<IActionResult>
 Get3([FromServices] IAmbientScopeProvider<string> ambientScopeProvider) {
    using (ambientScopeProvider.BeginScope("key1", "value1")) {
      Console.WriteLine("scope1 key1: " +ambientScopeProvider.GetValue("key1"));
      Console.WriteLine("scope1 key2:" +ambientScopeProvider.GetValue("key2"));
      await Task.Run(async () => {
        await Task.Delay(100);
        Console.WriteLine("task1 scope1 key1: " +ambientScopeProvider.GetValue("key1"));
        Console.WriteLine("task1 scope1 key2:" +ambientScopeProvider.GetValue("key2"));
      });
      using (ambientScopeProvider.BeginScope("key1", "value2")) {
        Console.WriteLine("scope2 key1: " + ambientScopeProvider.GetValue("key1"));
        Console.WriteLine("scope2 key2: " + ambientScopeProvider.GetValue("key2"));
        await Task.Run(async () => {
          await Task.Delay(100);
          Console.WriteLine("task2 scope2 key1: " + ambientScopeProvider.GetValue("key1"));
          Console.WriteLine("task2 scope2 key2: " + ambientScopeProvider.GetValue("key2"));
        });
        using (ambientScopeProvider.BeginScope("key2", "value3")) {
          Console.WriteLine("scope3 key1: " + ambientScopeProvider.GetValue("key1"));
          Console.WriteLine("scope3 key2: " + ambientScopeProvider.GetValue("key2"));
          await Task.Run(async () => {
            await Task.Delay(100);
            Console.WriteLine("task3 scope3 key1: " + ambientScopeProvider.GetValue("key1"));
            Console.WriteLine("task3 scope3 key2: " + ambientScopeProvider.GetValue("key2"));
          });
        }
      }
      //输出  key1 值
      Console.WriteLine(ambientScopeProvider.GetValue("key1"));
      Console.WriteLine(ambientScopeProvider.GetValue("key2"));
    }
    //没有输出
    Console.WriteLine(ambientScopeProvider.GetValue("key1"));
    Console.WriteLine(ambientScopeProvider.GetValue("key2"));
    return Ok("ok");
}

ICancellationTokenProvider

ICancellationTokenProvider CancellationToken 提供器,不同实现者要提供 CancellationToken 也 可以用 Use 方法重写,重写后返回再次获得就是重写的(CancellationToken Token)

public interface ICancellationTokenProvider
{
    CancellationToken Token { get; }

    IDisposable Use(CancellationToken cancellationToken);
}

public class CancellationTokenOverride
{
    public CancellationToken CancellationToken { get; }

    public CancellationTokenOverride(CancellationToken cancellationToken)
    {
        CancellationToken = cancellationToken;
    }
}
public abstract class CancellationTokenProviderBase : ICancellationTokenProvider
{
    public const string CancellationTokenOverrideContextKey =
     "Volo.Abp.Threading.CancellationToken.Override";

    public abstract CancellationToken Token { get; }

    protected IAmbientScopeProvider<CancellationTokenOverride>
     CancellationTokenOverrideScopeProvider { get; }

    protected CancellationTokenOverride? OverrideValue =>
    CancellationTokenOverrideScopeProvider.GetValue(CancellationTokenOverrideContextKey);

    protected CancellationTokenProviderBase(
      IAmbientScopeProvider<CancellationTokenOverride> cancellationTokenOverrideScopeProvider)
    {
        CancellationTokenOverrideScopeProvider = cancellationTokenOverrideScopeProvider;
    }

    public IDisposable Use(CancellationToken cancellationToken)
    {
        return CancellationTokenOverrideScopeProvider.
        BeginScope(CancellationTokenOverrideContextKey,
         new CancellationTokenOverride(cancellationToken));
    }
}

本模块自己的实现 ,没有提供任何

public class NullCancellationTokenProvider : CancellationTokenProviderBase
{
    public static NullCancellationTokenProvider Instance { get; } = new();

    public override CancellationToken Token =>
    OverrideValue?.CancellationToken ?? CancellationToken.None;

    private NullCancellationTokenProvider()
        : base(new AmbientDataContextAmbientScopeProvider<CancellationTokenOverride>
        (new AsyncLocalAmbientDataContext()))
    {
    }
}

Volo.Abp.AspNetCore 模块实现 CancellationTokenProviderBase, 在 http 请求被取消的时候发出 一个 CancellationToken 利用此 CancellationToken 可以取消一切读取当前真正指向的任务

namespace Volo.Abp.AspNetCore.Threading;
[Dependency(ReplaceServices = true)]
public class HttpContextCancellationTokenProvider :
CancellationTokenProviderBase, ITransientDependency
{
    private readonly IHttpContextAccessor _httpContextAccessor;

    public override CancellationToken Token {
        get {
            if (OverrideValue != null)
            {
                return OverrideValue.CancellationToken;
            }
            return _httpContextAccessor.HttpContext?.RequestAborted ??
             CancellationToken.None;
        }
    }

    public HttpContextCancellationTokenProvider(
        IAmbientScopeProvider<CancellationTokenOverride>
        cancellationTokenOverrideScopeProvider,
        IHttpContextAccessor httpContextAccessor)
        : base(cancellationTokenOverrideScopeProvider)
    {
        _httpContextAccessor = httpContextAccessor;
    }
}

AbpAsyncTimer

限制启动和停止方法的启动关系,通过 lock 及 Monitor.Wait 及 Monitor.Pulse

public class AbpAsyncTimer : ITransientDependency
{
    /// <summary>
    /// This func is raised periodically according to Period of Timer.
    /// </summary>
    public Func<AbpAsyncTimer, Task> Elapsed = _ => Task.CompletedTask;

    /// <summary>
    /// Task period of timer (as milliseconds).
    /// </summary>
    public int Period { get; set; }

    /// <summary>
    /// Indicates whether timer raises Elapsed event on Start method of Timer for once.
    /// Default: False.
    /// </summary>
    public bool RunOnStart { get; set; }

    public ILogger<AbpAsyncTimer> Logger { get; set; }

    public IExceptionNotifier ExceptionNotifier { get; set; }

    private readonly Timer _taskTimer;
    private volatile bool _performingTasks;
    private volatile bool _isRunning;

    public AbpAsyncTimer()
    {
        ExceptionNotifier = NullExceptionNotifier.Instance;
        Logger = NullLogger<AbpAsyncTimer>.Instance;

        _taskTimer = new Timer(
            TimerCallBack!,
            null,
            Timeout.Infinite,
            Timeout.Infinite
        );
    }

    public void Start(CancellationToken cancellationToken = default)
    {
        if (Period <= 0)
        {
            throw new AbpException("Period should be set before starting the timer!");
        }

        lock (_taskTimer)
        {
            _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
            _isRunning = true;
        }
    }

    public void Stop(CancellationToken cancellationToken = default)
    {
        lock (_taskTimer)
        {
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
            while (_performingTasks)
            {
                Monitor.Wait(_taskTimer);
            }

            _isRunning = false;
        }
    }

    /// <summary>
    /// This method is called by _taskTimer.
    /// </summary>
    /// <param name="state">Not used argument</param>
    private void TimerCallBack(object state)
    {
        lock (_taskTimer)
        {
            if (!_isRunning || _performingTasks)
            {
                return;
            }

            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
            _performingTasks = true;
        }

        _ = Timer_Elapsed();
    }

    private async Task Timer_Elapsed()
    {
        try
        {
            await Elapsed(this);
        }
        catch (Exception ex)
        {
            Logger.LogException(ex);
            await ExceptionNotifier.NotifyAsync(ex);
        }
        finally
        {
            lock (_taskTimer)
            {
                _performingTasks = false;
                if (_isRunning)
                {
                    _taskTimer.Change(Period, Timeout.Infinite);
                }

                Monitor.Pulse(_taskTimer);
            }
        }
    }
}
👍🎉🎊