IBackgroundWorker
éèŠéå€æ§è¡çä»»å¡
åå°äœäžéèŠæå¯åšååæ¢æ¹æ³ -> IRunnable,åäŸ
public interface IBackgroundWorker : IRunnable, ISingletonDependency
{
}
æåºç¡çæœè±¡
namespace Volo.Abp.BackgroundWorkers;
public abstract class BackgroundWorkerBase : IBackgroundWorker
{
public IAbpLazyServiceProvider LazyServiceProvider { get; set; } = default!;
public IServiceProvider ServiceProvider { get; set; } = default!;
protected ILoggerFactory LoggerFactory =>
LazyServiceProvider.LazyGetRequiredService<ILoggerFactory>();
protected ILogger Logger =>
LazyServiceProvider.LazyGetService<ILogger>
(provider => LoggerFactory?.CreateLogger(GetType().FullName!) ?? NullLogger.Instance);
//åå°ä»»å¡åæ¢å对å€éç¥
protected CancellationTokenSource StoppingTokenSource { get; }
protected CancellationToken StoppingToken { get; }
public BackgroundWorkerBase()
{
StoppingTokenSource = new CancellationTokenSource();
StoppingToken = StoppingTokenSource.Token;
}
public virtual Task StartAsync(CancellationToken cancellationToken = default)
{
Logger.LogDebug("Started background worker: " + ToString());
return Task.CompletedTask;
}
public virtual Task StopAsync(CancellationToken cancellationToken = default)
{
Logger.LogDebug("Stopped background worker: " + ToString());
StoppingTokenSource.Cancel();
StoppingTokenSource.Dispose();
return Task.CompletedTask;
}
public override string ToString()
{
return GetType().FullName!;
}
}
AsyncPeriodicBackgroundWorkerBase è¿äžæ¥çæœè±¡ æäŸ DoWorkAsync æ¹æ³å ±å ·äœå®ç°
public abstract class AsyncPeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected AbpAsyncTimer Timer { get; }
protected CancellationToken StartCancellationToken { get; set; }
protected AsyncPeriodicBackgroundWorkerBase(
AbpAsyncTimer timer,
IServiceScopeFactory serviceScopeFactory)
{
ServiceScopeFactory = serviceScopeFactory;
Timer = timer;
Timer.Elapsed = Timer_Elapsed;
}
public async override Task StartAsync(CancellationToken cancellationToken = default)
{
StartCancellationToken = cancellationToken;
await base.StartAsync(cancellationToken);
Timer.Start(cancellationToken);
}
public async override Task StopAsync(CancellationToken cancellationToken = default)
{
Timer.Stop(cancellationToken);
await base.StopAsync(cancellationToken);
}
private async Task Timer_Elapsed(AbpAsyncTimer timer)
{
await DoWorkAsync(StartCancellationToken);
}
private async Task DoWorkAsync(CancellationToken cancellationToken = default)
{
using (var scope = ServiceScopeFactory.CreateScope())
{
try
{
await DoWorkAsync(new PeriodicBackgroundWorkerContext(
scope.ServiceProvider, cancellationToken));
}
catch (Exception ex)
{
await scope.ServiceProvider
.GetRequiredService<IExceptionNotifier>()
.NotifyAsync(new ExceptionNotificationContext(ex));
Logger.LogException(ex);
}
}
}
//å¯ä»¥å®ç°æ€æ¹æ³,å®ç°åšææ§çä»»å¡
protected abstract Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext);
}
IBackgroundJobExecuter
public class JobExecutionContext : IServiceProviderAccessor
{
public IServiceProvider ServiceProvider { get; }
public Type JobType { get; }
public object JobArgs { get; }
public CancellationToken CancellationToken { get; }
public JobExecutionContext(
IServiceProvider serviceProvider,
Type jobType,
object jobArgs,
CancellationToken cancellationToken = default)
{
ServiceProvider = serviceProvider;
JobType = jobType;
JobArgs = jobArgs;
CancellationToken = cancellationToken;
}
}
public interface IBackgroundJobExecuter
{
Task ExecuteAsync(JobExecutionContext context);
}
BackgroundJobWorker IBackgroundJobWorker çé»è®€å®ç°
BackgroundJobWorker IBackgroundJobWorker çå®ç°,éè¿ AbpAsyncTimer åå°ç
public class BackgroundJobWorker : AsyncPeriodicBackgroundWorkerBase, IBackgroundJobWorker
{
protected const string DistributedLockName = "AbpBackgroundJobWorker";
protected AbpBackgroundJobOptions JobOptions { get; }
protected AbpBackgroundJobWorkerOptions WorkerOptions { get; }
protected IAbpDistributedLock DistributedLock { get; }
public BackgroundJobWorker(
AbpAsyncTimer timer,
IOptions<AbpBackgroundJobOptions> jobOptions,
IOptions<AbpBackgroundJobWorkerOptions> workerOptions,
IServiceScopeFactory serviceScopeFactory,
IAbpDistributedLock distributedLock)
: base(
timer,
serviceScopeFactory)
{
DistributedLock = distributedLock;
WorkerOptions = workerOptions.Value;
JobOptions = jobOptions.Value;
Timer.Period = WorkerOptions.JobPollPeriod;
}
protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
{
await using (var handler = await DistributedLock.TryAcquireAsync(DistributedLockName, cancellationToken: StoppingToken))
{
if (handler != null)
{
var store = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount);
if (!waitingJobs.Any())
{
return;
}
var jobExecuter = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>();
var clock = workerContext.ServiceProvider.GetRequiredService<IClock>();
var serializer = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>();
foreach (var jobInfo in waitingJobs)
{
jobInfo.TryCount++;
jobInfo.LastTryTime = clock.Now;
try
{
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
var context = new JobExecutionContext(
workerContext.ServiceProvider,
jobConfiguration.JobType,
jobArgs,
workerContext.CancellationToken);
try
{
await jobExecuter.ExecuteAsync(context);
await store.DeleteAsync(jobInfo.Id);
}
catch (BackgroundJobExecutionException)
{
var nextTryTime = CalculateNextTryTime(jobInfo, clock);
if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
else
{
jobInfo.IsAbandoned = true;
}
await TryUpdateAsync(store, jobInfo);
}
}
catch (Exception ex)
{
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
await TryUpdateAsync(store, jobInfo);
}
}
}
else
{
try
{
await Task.Delay(WorkerOptions.JobPollPeriod * 12, StoppingToken);
}
catch (TaskCanceledException) { }
}
}
}
protected virtual async Task TryUpdateAsync(IBackgroundJobStore store, BackgroundJobInfo jobInfo)
{
try
{
await store.UpdateAsync(jobInfo);
}
catch (Exception updateEx)
{
Logger.LogException(updateEx);
}
}
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration *
(Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1));
var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
clock.Now.AddSeconds(nextWaitDuration);
if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)
{
return null;
}
return nextTryDate;
}
}
ððð