AbpThreadingModule
public class AbpThreadingModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.AddSingleton<ICancellationTokenProvider>(
NullCancellationTokenProvider.Instance);
context.Services.AddSingleton(typeof(IAmbientScopeProvider<>),
typeof(AmbientDataContextAmbientScopeProvider<>));
}
}
IAmbientDataContext , AmbientDataContextAmbientScopeProvider
多线程的环境上下文,AsyncLocal 可以在 await 里使用,环境上下文给初始数据,每个 scope 里可 以自己改写
public interface IAmbientDataContext
{
void SetData(string key, object? value);
object? GetData(string key);
}
public class AsyncLocalAmbientDataContext : IAmbientDataContext, ISingletonDependency
{
//并发字典加 在 await 里可以传递的的上下文数据
private static readonly ConcurrentDictionary<string, AsyncLocal<object?>>
AsyncLocalDictionary = new ConcurrentDictionary<string, AsyncLocal<object?>>();
public void SetData(string key, object? value)
{
var asyncLocal = AsyncLocalDictionary.GetOrAdd(key, (k) => new AsyncLocal<object?>());
asyncLocal.Value = value;
}
public object? GetData(string key)
{
var asyncLocal = AsyncLocalDictionary.GetOrAdd(key, (k) => new AsyncLocal<object?>());
return asyncLocal.Value;
}
}
环境上下文内 开启 scope ,scope 内可以层层形成嵌套, 通过 IAmbientDataContext 保存 key 和 id 在 ScopeDictionary 里通过 id 找到 value
public interface IAmbientScopeProvider<T>
{
//在 scope 范围内, 给定 key 获取值
T? GetValue(string contextKey);
// 开启一个 scope 把 key 和 value 保存进去
IDisposable BeginScope(string contextKey, T value);
}
public class AmbientDataContextAmbientScopeProvider<T> : IAmbientScopeProvider<T>
{
public ILogger<AmbientDataContextAmbientScopeProvider<T>> Logger { get; set; }
//为了在上下文和线程里传递数据,使用字典保存环境数据
private static readonly ConcurrentDictionary<string, ScopeItem>
ScopeDictionary = new ConcurrentDictionary<string, ScopeItem>();
private readonly IAmbientDataContext _dataContext;
public AmbientDataContextAmbientScopeProvider([NotNull] IAmbientDataContext dataContext)
{
Check.NotNull(dataContext, nameof(dataContext));
_dataContext = dataContext;
Logger = NullLogger<AmbientDataContextAmbientScopeProvider<T>>.Instance;
}
public T? GetValue(string contextKey)
{
var item = GetCurrentItem(contextKey);
if (item == null)
{
return default;
}
return item.Value;
}
public IDisposable BeginScope(string contextKey, T value)
{
var item = new ScopeItem(value, GetCurrentItem(contextKey));
if (!ScopeDictionary.TryAdd(item.Id, item))
{
throw new AbpException(
"Can not add item! ScopeDictionary.TryAdd returns false!");
}
_dataContext.SetData(contextKey, item.Id);
return new DisposeAction<ValueTuple<ConcurrentDictionary<string,
ScopeItem>, ScopeItem, IAmbientDataContext, string>>(static (state) =>
{
var (scopeDictionary, item, dataContext, contextKey) = state;
scopeDictionary.TryRemove(item.Id, out item);
if (item == null)
{
return;
}
if (item.Outer == null)
{
dataContext.SetData(contextKey, null);
return;
}
dataContext.SetData(contextKey, item.Outer.Id);
}, (ScopeDictionary, item, _dataContext, contextKey));
}
private ScopeItem? GetCurrentItem(string contextKey)
{
//现在环境上下文里找,没有在 scope 链的上级找,一直找到最顶层,找不到返回 null
return _dataContext.GetData(contextKey) is string objKey ?
ScopeDictionary.GetOrDefault(objKey) : null;
}
private class ScopeItem
{
public string Id { get; }
public ScopeItem? Outer { get; }
public T Value { get; }
public ScopeItem(T value, ScopeItem? outer = null)
{
Id = Guid.NewGuid().ToString();
Value = value;
Outer = outer;
}
}
}
演示 demo 结果
scope1 key1: value1 scope1 key2: task1 scope1 key1: value1 task1 scope1 key2: scope2 key1: value2 scope2 key2: task2 scope2 key1: value2 task2 scope2 key2: scope3 key1: value2 scope3 key2: value3 task3 scope3 key1: value2 task3 scope3 key2: value3 value1
[HttpGet("get3")]
public async Task<IActionResult>
Get3([FromServices] IAmbientScopeProvider<string> ambientScopeProvider) {
using (ambientScopeProvider.BeginScope("key1", "value1")) {
Console.WriteLine("scope1 key1: " +ambientScopeProvider.GetValue("key1"));
Console.WriteLine("scope1 key2:" +ambientScopeProvider.GetValue("key2"));
await Task.Run(async () => {
await Task.Delay(100);
Console.WriteLine("task1 scope1 key1: " +ambientScopeProvider.GetValue("key1"));
Console.WriteLine("task1 scope1 key2:" +ambientScopeProvider.GetValue("key2"));
});
using (ambientScopeProvider.BeginScope("key1", "value2")) {
Console.WriteLine("scope2 key1: " + ambientScopeProvider.GetValue("key1"));
Console.WriteLine("scope2 key2: " + ambientScopeProvider.GetValue("key2"));
await Task.Run(async () => {
await Task.Delay(100);
Console.WriteLine("task2 scope2 key1: " + ambientScopeProvider.GetValue("key1"));
Console.WriteLine("task2 scope2 key2: " + ambientScopeProvider.GetValue("key2"));
});
using (ambientScopeProvider.BeginScope("key2", "value3")) {
Console.WriteLine("scope3 key1: " + ambientScopeProvider.GetValue("key1"));
Console.WriteLine("scope3 key2: " + ambientScopeProvider.GetValue("key2"));
await Task.Run(async () => {
await Task.Delay(100);
Console.WriteLine("task3 scope3 key1: " + ambientScopeProvider.GetValue("key1"));
Console.WriteLine("task3 scope3 key2: " + ambientScopeProvider.GetValue("key2"));
});
}
}
//输出 key1 值
Console.WriteLine(ambientScopeProvider.GetValue("key1"));
Console.WriteLine(ambientScopeProvider.GetValue("key2"));
}
//没有输出
Console.WriteLine(ambientScopeProvider.GetValue("key1"));
Console.WriteLine(ambientScopeProvider.GetValue("key2"));
return Ok("ok");
}
ICancellationTokenProvider
ICancellationTokenProvider CancellationToken 提供器,不同实现者要提供 CancellationToken 也 可以用 Use 方法重写,重写后返回再次获得就是重写的(CancellationToken Token)
public interface ICancellationTokenProvider
{
CancellationToken Token { get; }
IDisposable Use(CancellationToken cancellationToken);
}
public class CancellationTokenOverride
{
public CancellationToken CancellationToken { get; }
public CancellationTokenOverride(CancellationToken cancellationToken)
{
CancellationToken = cancellationToken;
}
}
public abstract class CancellationTokenProviderBase : ICancellationTokenProvider
{
public const string CancellationTokenOverrideContextKey =
"Volo.Abp.Threading.CancellationToken.Override";
public abstract CancellationToken Token { get; }
protected IAmbientScopeProvider<CancellationTokenOverride>
CancellationTokenOverrideScopeProvider { get; }
protected CancellationTokenOverride? OverrideValue =>
CancellationTokenOverrideScopeProvider.GetValue(CancellationTokenOverrideContextKey);
protected CancellationTokenProviderBase(
IAmbientScopeProvider<CancellationTokenOverride> cancellationTokenOverrideScopeProvider)
{
CancellationTokenOverrideScopeProvider = cancellationTokenOverrideScopeProvider;
}
public IDisposable Use(CancellationToken cancellationToken)
{
return CancellationTokenOverrideScopeProvider.
BeginScope(CancellationTokenOverrideContextKey,
new CancellationTokenOverride(cancellationToken));
}
}
本模块自己的实现 ,没有提供任何
public class NullCancellationTokenProvider : CancellationTokenProviderBase
{
public static NullCancellationTokenProvider Instance { get; } = new();
public override CancellationToken Token =>
OverrideValue?.CancellationToken ?? CancellationToken.None;
private NullCancellationTokenProvider()
: base(new AmbientDataContextAmbientScopeProvider<CancellationTokenOverride>
(new AsyncLocalAmbientDataContext()))
{
}
}
Volo.Abp.AspNetCore 模块实现 CancellationTokenProviderBase, 在 http 请求被取消的时候发出 一个 CancellationToken 利用此 CancellationToken 可以取消一切读取当前真正指向的任务
namespace Volo.Abp.AspNetCore.Threading;
[Dependency(ReplaceServices = true)]
public class HttpContextCancellationTokenProvider :
CancellationTokenProviderBase, ITransientDependency
{
private readonly IHttpContextAccessor _httpContextAccessor;
public override CancellationToken Token {
get {
if (OverrideValue != null)
{
return OverrideValue.CancellationToken;
}
return _httpContextAccessor.HttpContext?.RequestAborted ??
CancellationToken.None;
}
}
public HttpContextCancellationTokenProvider(
IAmbientScopeProvider<CancellationTokenOverride>
cancellationTokenOverrideScopeProvider,
IHttpContextAccessor httpContextAccessor)
: base(cancellationTokenOverrideScopeProvider)
{
_httpContextAccessor = httpContextAccessor;
}
}
AbpAsyncTimer
限制启动和停止方法的启动关系,通过 lock 及 Monitor.Wait 及 Monitor.Pulse
public class AbpAsyncTimer : ITransientDependency
{
/// <summary>
/// This func is raised periodically according to Period of Timer.
/// </summary>
public Func<AbpAsyncTimer, Task> Elapsed = _ => Task.CompletedTask;
/// <summary>
/// Task period of timer (as milliseconds).
/// </summary>
public int Period { get; set; }
/// <summary>
/// Indicates whether timer raises Elapsed event on Start method of Timer for once.
/// Default: False.
/// </summary>
public bool RunOnStart { get; set; }
public ILogger<AbpAsyncTimer> Logger { get; set; }
public IExceptionNotifier ExceptionNotifier { get; set; }
private readonly Timer _taskTimer;
private volatile bool _performingTasks;
private volatile bool _isRunning;
public AbpAsyncTimer()
{
ExceptionNotifier = NullExceptionNotifier.Instance;
Logger = NullLogger<AbpAsyncTimer>.Instance;
_taskTimer = new Timer(
TimerCallBack!,
null,
Timeout.Infinite,
Timeout.Infinite
);
}
public void Start(CancellationToken cancellationToken = default)
{
if (Period <= 0)
{
throw new AbpException("Period should be set before starting the timer!");
}
lock (_taskTimer)
{
_taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
_isRunning = true;
}
}
public void Stop(CancellationToken cancellationToken = default)
{
lock (_taskTimer)
{
_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
while (_performingTasks)
{
Monitor.Wait(_taskTimer);
}
_isRunning = false;
}
}
/// <summary>
/// This method is called by _taskTimer.
/// </summary>
/// <param name="state">Not used argument</param>
private void TimerCallBack(object state)
{
lock (_taskTimer)
{
if (!_isRunning || _performingTasks)
{
return;
}
_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
_performingTasks = true;
}
_ = Timer_Elapsed();
}
private async Task Timer_Elapsed()
{
try
{
await Elapsed(this);
}
catch (Exception ex)
{
Logger.LogException(ex);
await ExceptionNotifier.NotifyAsync(ex);
}
finally
{
lock (_taskTimer)
{
_performingTasks = false;
if (_isRunning)
{
_taskTimer.Change(Period, Timeout.Infinite);
}
Monitor.Pulse(_taskTimer);
}
}
}
}