c#事务
要想弄清楚 abp 的事务,前提要了解 c# 事务的边界
最早的开始的 ado net 是通过 SqlConnection 开启一个事务 SqlTransaction 是在 DbTransaction 上封装的
public sealed class SqlTransaction : DbTransaction{}
using Microsoft.Data.SqlClient;
using (SqlConnection connection = new SqlConnection(''))
{
connection.Open();
using (SqlTransaction transaction = connection.BeginTransaction())
{
try
{
// Perform transactional operations here
transaction.Commit();
}
catch (Exception ex)
{
// Handle the exception and roll back the transaction
transaction.Rollback();
}
}
}
跨多个资源管理器的分布式事务协调
System.Transactions 提供了更灵活的事务
System.Transactions 命名空间提供基于 Transaction 类的显式编程模型和使用 TransactionScope 类的隐式编程模型
显示事务 CommittableTransaction
显示事务,可提交的事务
public sealed class CommittableTransaction : Transaction, IAsyncResult{
}
需要手动的把事务给到连接 myConnection.EnlistTransaction(tx); 创建嵌套事务也需要自己写代码
var tx = new CommittableTransaction();
SqlConnection myConnection = new SqlConnection("");
SqlCommand myCommand = new SqlCommand();
//Open the SQL connection
myConnection.Open();
//Give the transaction to SQL to enlist with
myConnection.EnlistTransaction(tx);
myCommand.Connection = myConnection;
try{
myCommand.CommandText = "";
myCommand.ExecuteNonQuery();
tx.Commit();
}catch(Exception){
tx.Rollback();
}finally{
myConnection.Close();
tx = null;
}
隐式事务 TransactionScope
try {
using (TransactionScope ts = new TransactionScope()) {
using (SqlConnection connection1 = new SqlConnection("")){
connection1.Open();
SqlCommand command1 = new SqlCommand("", connection1);
returnValue = command1.ExecuteNonQuery();
}
//如果过程中发生任何异常 Complete 方法不会提交,而是自动进行提交
// ts.Complete(); 可以不写 ts 对象在 dispose 的时候会自动提交,提交过程中有异常自动回滚
//任然可以使用 try catch 进行提交及回滚
ts.Complete();
}
}
catch (Exception e) {
}
TransactionScope 可以完美的进行嵌套,如下的事务默认会用同一个 事务进行处理只要嵌套的内外事 务都成功时候,才不会回滚,否则任一个事务异常,内外事务都会回滚
void RootMethod()
{
using(TransactionScope scope = new TransactionScope())
{
/* Perform transactional work here */
SomeMethod();
scope.Complete();
}
}
void SomeMethod()
{
using(TransactionScope scope = new TransactionScope())
{
/* Perform transactional work here */
scope.Complete();
}
}
事务的关联性 TransactionScopeOption 在 new TransactionScope() 的时候传入 TransactionScopeOption 枚举
public enum TransactionScopeOption
{
/// <summary> A transaction is required by the scope. It uses an ambient transaction
///if one already exists. Otherwise, it creates a new
///transaction before entering the scope. This is the default value.
存在环境事务,就使用环境的,不存在就创建新的 </summary>
Required,
/// <summary>A new transaction is always created for the scope.
/// 直接创建新的 scope 事务 </summary>
RequiresNew,
/// <summary>The ambient transaction context is suppressed when
/// creating the scope. All operations within the scope are
/// done without an ambient transaction context. 不使用任何事务 </summary>
Suppress,
}
System.Transactions 没有提供事务的保存点功能
如果要是实现保存点功能,只能通过事务嵌套的方式实现
using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required)) {
// ts1 执行失败不会影响 ts2
using (TransactionScope ts1 = new TransactionScope(TransactionScopeOption.RequiresNew)) {
ts1.Complete();
}
using (TransactionScope ts2 = new TransactionScope(TransactionScopeOption.RequiresNew)) {
ts2.Complete();
}
ts.Complete();
}
事务的隔离级别 事务的隔离是事务之间互相的影响程度
数据库事务要满足 4 大原则:
- 原子性 已经提交的事务不能再次提交
- 隔离性 事务之间数据可以互相隔离,不可见
- 持久性 数据提交到数据库,数据库要保证数据不会丢失
- 一致性 上面三条原则来保证一致性
隔离性主要通过锁来实现: 1: 共享锁(读锁) 2:排它锁(编辑或删除锁,独占锁) 3:行级锁 ,一 种细粒度的锁,它可以在事务中对数据库中的单个行进行加锁,以保证数据的一致性和隔离性
-
Read Uncommitted 读未提交,默认级别它是最低级别,事务之间没有隔离,可能导致脏读取、不可 重复读取和幻读取等问题
-
Read Committed 读已提交,它可以防止脏读,但仍然会导致不可重复读和幻读。
在"读已提交"隔离级别下,当一个事务读取某一行数据后,其他事务可以修改该行数据,但这些修 改对于当前正在执行的事务是不可见的,直到修改事务提交。这确保了在"读已提交"隔离级别下不 会出现脏读(Dirty Read)问题,因为事务只能读取已提交的数据,而不会读取其他事务尚未提交 的数据。
此隔离级别用到了共享锁和排它锁进行配合,共享锁是一种读锁,它允许多个事务同时读取同一份 数据,但不允许其他事务对该数据进行修改。共享锁是一种共享资源,多个事务可以同时持有共享 锁,不会相互阻塞。只有当一个事务持有共享锁时,其他事务才能获取共享锁,此时排他锁会被阻 塞直到所有共享锁都释放。这种机制确保了在"读已提交"隔离级别下,多个事务可以同时读取数据 ,但只有一个事务能够修改数据。排他锁是一种(写,删除)锁,它允许一个事务独占地对数据进 行修改,删除,其他事务无法同时持有排他锁或共享锁。当一个事务持有排他锁时,其他事务无法 获取共享锁或排他锁,它们会被阻塞直到排他锁被释放。这种机制确保了在"读已提交"隔离级别下 ,只有一个事务能够修改或删除数据,其他事务无法执行任何操作。
不可重复读:产生原因,之前读取的数据,之后被其他事务修改了,再次读取数据变了。幻读:之 前按照一定规则读取了多行数据,之后被其他事务删除或新增了部分数据,再次读取数据量变了
-
Repeatable Read 可重复读取,会有幻读, insert 插入导致的,加的是行级锁 当一个事务开始时 ,它会获取需要读取或修改的数据行的行级锁。这个锁会在事务提交或回滚之后释放。其他事务如 果要读取或修改或删除此行数据的操作会被阻塞,直到行级锁被释放。
在这个隔离级别下,一个事务在执行期间看到的数据保持一致,这可以防止不可重复读问题,但仍 然可能出现幻读(Phantom Read)问题,即一个事务在两次查询之间看到了新增的数据
-
Serializable 可序列化的对所有的事务进行排队执行,性能最差
IDbContextTransaction 在 IDbTransaction 继承上的封装
public interface IDbContextTransaction : IDisposable, IAsyncDisposable
{
Guid TransactionId { get; }
void Commit();
Task CommitAsync(CancellationToken cancellationToken = default);
void Rollback();
Task RollbackAsync(CancellationToken cancellationToken = default);
void CreateSavepoint(string name)
=> throw new NotSupportedException(CoreStrings.SavepointsNotSupported);
Task CreateSavepointAsync(string name, CancellationToken cancellationToken = default)
=> throw new NotSupportedException(CoreStrings.SavepointsNotSupported);
void RollbackToSavepoint(string name)
=> throw new NotSupportedException(CoreStrings.SavepointsNotSupported);
Task RollbackToSavepointAsync(string name, CancellationToken cancellationToken = default)
=> throw new NotSupportedException(CoreStrings.SavepointsNotSupported);
void ReleaseSavepoint(string name)
{
}
Task ReleaseSavepointAsync(string name, CancellationToken cancellationToken = default)
=> Task.CompletedTask;
bool SupportsSavepoint
=> false;
}
public interface IDbTransaction : IDisposable
{
IDbConnection? Connection { get; }
IsolationLevel IsolationLevel { get; }
void Commit();
void Rollback();
}
}
//各个数据库可以实现此类
public abstract class DbTransaction :
MarshalByRefObject,
IDbTransaction,
IDisposable,
IAsyncDisposable
{
}
// IDbContextTransaction 依赖 DbTransaction 实现, 而 DbTransaction 有依赖各个数据库提供者实现
public class RelationalTransaction : IDbContextTransaction, IInfrastructure<DbTransaction>{
private readonly DbTransaction _dbTransaction;
}
public interface IDbContextTransaction : IDisposable, IAsyncDisposable
{
Guid TransactionId { get; }
void Commit();
Task CommitAsync(CancellationToken cancellationToken = default);
void Rollback();
Task RollbackAsync(CancellationToken cancellationToken = default);
void CreateSavepoint(string name)
=> throw new NotSupportedException(CoreStrings.SavepointsNotSupported);
Task CreateSavepointAsync(string name, CancellationToken cancellationToken = default)
=> throw new NotSupportedException(CoreStrings.SavepointsNotSupported);
void RollbackToSavepoint(string name)
=> throw new NotSupportedException(CoreStrings.SavepointsNotSupported);
Task RollbackToSavepointAsync(string name, CancellationToken cancellationToken = default)
=> throw new NotSupportedException(CoreStrings.SavepointsNotSupported);
void ReleaseSavepoint(string name) {}
Task ReleaseSavepointAsync(string name, CancellationToken cancellationToken = default)
=> Task.CompletedTask;
bool SupportsSavepoint
=> false;
}
IDbContextTransaction 不支持分布式事务
现在可以创建共享同一连接的多个上下文实例。 然后使用 DbContext.Database.UseTransaction(DbTransaction) API 在同一事务中登记两个上下文
public class MyContext:DbContext {
private static readonly string ConnectionString
= @"Host=127.0.0.1;Port=5432;Database=postgres;
Username=postgres;Password=admin@123;Pooling=true";
public DbSet<Bar> Bars { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) {
optionsBuilder.UseNpgsql(ConnectionString, optionsBuilder => {
});
}
}
public class MyService {
[NotNull] private readonly MyContext _myContext;
public MyService(MyContext myContext) {
_myContext = myContext;
}
public async Task Func() {
await using var trans = await _myContext.Database.BeginTransactionAsync();
Console.WriteLine(
"myContext dbConnection:"+_myContext.Database.GetDbConnection().GetHashCode());
Console.WriteLine(
"myContext transactionId:"+trans.TransactionId);
try {
await _myContext.AddAsync(new Bar() { Name = "bar name" + DateTime.Now.ToString() });
await _myContext.SaveChangesAsync();
//要想共享同一个事务,必须是同一个 Db Connection
await using var nestedContext = new MyContext();
//先关闭现在的
await nestedContext.Database.GetDbConnection().CloseAsync();
//在设置外层共享的 Db Connection
nestedContext.Database.
SetDbConnection(_myContext.Database.GetDbConnection());
//内层就可以使用外侧的事务了, 这样只要在外侧提交事务就好了,因为它们现在是一个事务
await using var nextedTrans =
await nestedContext.Database.UseTransactionAsync(trans.GetDbTransaction());
try {
Console.WriteLine(
"nestedContext dbConnection:"+nestedContext.Database.
GetDbConnection().GetHashCode());
Console.WriteLine(
"nestedContext transactionId:"+nextedTrans.TransactionId);
await nestedContext.AddAsync(new Bar() { Name = "nested bar name"
+ DateTime.Now.ToString() });
await nestedContext.SaveChangesAsync();
// 共享事务后,只能在外层提交,内存如果提交了外侧就不能在提交了
// 因为同一事务只能提交一次
//await nextedTrans.CommitAsync();
}
catch (Exception e) {
await nextedTrans.RollbackAsync();
throw;
}
await _myContext.AddAsync(new Bar() { Name = "bar name" +
DateTime.Now.ToString() });
await _myContext.SaveChangesAsync();
//事务一旦提交,事务上的数据库连接就会断开
await trans.CommitAsync();
}
catch (Exception e) {
//使用 using 会自动回滚
//await trans?.RollbackAsync();
}
}
}
内外层事务完全不同,不同的连接,不同的上下文,形成的嵌套事务,互相之间的影响要靠用户代码 来管理
DbCommand 生成命令 info: 2023/10/17 11:04:01.843 RelationalEventId.CommandExecuted[20101] (Microsoft.EntityFrameworkCore.Database.Command) Executed DbCommand (2ms) [Parameters=[@p0='c1' (Nullable = false), @p1='c2' (Nullable = false)], CommandType='Text', CommandTimeout='30'] INSERT INTO "Bars" ("Name") VALUES (@p0) RETURNING "Id"; INSERT INTO "Bars" ("Name") VALUES (@p1) RETURNING "Id"; DbTransaction 事 务提交了 SaveChanges 执行了 DbCommand 生成命令 info: 2023/10/17 11:04:01.852 RelationalEventId.CommandExecuted[20101] (Microsoft.EntityFrameworkCore.Database.Command) Executed DbCommand (4ms) [Parameters=[@p0='c3' (Nullable = false)], CommandType='Text', CommandTimeout='30'] INSERT INTO "Bars" ("Name") VALUES (@p0) RETURNING "Id"; SaveChanges 执行了 myContext dbConnection:66464604 myContext transaction:3452a93c-ecd7-43ff-acb3-722b0ee358d5 DbCommand 生成命令 info: 2023/10/17 11:04:01.856 RelationalEventId.CommandExecuted[20101] (Microsoft.EntityFrameworkCore.Database.Command) Executed DbCommand (1ms) [Parameters=[@p0='bar name2023/10/17 11:04:01' (Nullable = false)], CommandType='Text', CommandTimeout='30'] INSERT INTO "Bars" ("Name") VALUES (@p0) RETURNING "Id"; SaveChanges 执行了 nestedContext dbConnection:109847 nestedContext transactionId:468b640d-fd06-45a8-bdee-e5fd124d6c24 DbCommand 生成命令 info: 2023/10/17 11:04:01.862 RelationalEventId.CommandExecuted[20101] (Microsoft.EntityFrameworkCore.Database.Command) Executed DbCommand (1ms) [Parameters=[@p0='nested bar name2023/10/17 11:04:01' (Nullable = false)], CommandType='Text', CommandTimeout='30'] INSERT INTO "Bars" ("Name") VALUES (@p0) RETURNING "Id"; SaveChanges 执行了 DbTransaction 事务提交了 DbCommand 生成命令 info: 2023/10/17 11:04:01.870 RelationalEventId.CommandExecuted[20101] (Microsoft.EntityFrameworkCore.Database.Command) Executed DbCommand (2ms) [Parameters=[@p0='bar name2023/10/17 11:04:01' (Nullable = false)], CommandType='Text', CommandTimeout='30'] INSERT INTO "Bars" ("Name") VALUES (@p0) RETURNING "Id"; SaveChanges 执行了 DbTransaction 事务提交了
// ef 支持的拦截器
public class MyDbTransactionInterceptor : DbTransactionInterceptor {
public override Task TransactionCommittedAsync(DbTransaction transaction,
TransactionEndEventData eventData,
CancellationToken cancellationToken = new CancellationToken()) {
Console.WriteLine("DbTransaction 事务提交了");
return base.TransactionCommittedAsync(transaction, eventData, cancellationToken);
}
}
public class MyDbCommandInterceptor:DbCommandInterceptor {
public override DbCommand CommandCreated(CommandEndEventData eventData,
DbCommand result) {
Console.WriteLine("DbCommand 生成命令");
return base.CommandCreated(eventData, result);
}
}
public class MySaveChangesInterceptor : SaveChangesInterceptor {
public override ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData
eventData, int result,
CancellationToken cancellationToken = new CancellationToken()) {
Console.WriteLine("SaveChanges 执行了");
return base.SavedChangesAsync(eventData, result, cancellationToken);
}
}
public class MyContext:DbContext {
private static readonly string ConnectionString =
@"Host=127.0.0.1;Port=5432;Database=postgres;
Username=postgres;Password=admin@123;Pooling=true";
public DbSet<Bar> Bars { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) {
optionsBuilder.UseNpgsql(ConnectionString, optionsBuilder => {
});
//注册用户拦截器
optionsBuilder.AddInterceptors(new MyDbCommandInterceptor(),
new MyDbTransactionInterceptor(),new MySaveChangesInterceptor());
//打印日志,如果想看 ef 内部业务的来龙去脉,日志级别写出 debug
optionsBuilder.LogTo(Console.WriteLine,
Microsoft.Extensions.Logging.LogLevel.Information).EnableSensitiveDataLogging();
}
}
public class Bar {
public int Id { get; set; }
public string Name { get; set; }
}
public class MyService {
[NotNull] private readonly MyContext _myContext;
public MyService(MyContext myContext) {
_myContext = myContext;
}
public async Task Func() {
//SaveChangesAsync 会内部开启一个事务,执行完这俩条语句事务会自动关闭
//用户无法控制其内部的事务
await _myContext.AddAsync(new Bar() { Name = "c1" });
await _myContext.AddAsync(new Bar() { Name = "c2" });
await _myContext.SaveChangesAsync();
//单条 ef 命令不执行事务
await _myContext.AddAsync(new Bar() { Name = "c3" });
await _myContext.SaveChangesAsync();
//开启事务
await using var trans = await _myContext.Database.BeginTransactionAsync();
Console.WriteLine(
"myContext dbConnection:"+_myContext.Database.GetDbConnection().GetHashCode());
Console.WriteLine("myContext transaction:"+trans.TransactionId);
try {
//在事务内部,SaveChangesAsync 内部不会在开启事务,只做 db command 的执行
await _myContext.AddAsync(new Bar() { Name = "bar name" + DateTime.Now.ToString() });
await _myContext.SaveChangesAsync();
await using var nestedContext = new MyContext();
await using var nextedTrans =
await nestedContext.Database.BeginTransactionAsync();
try {
Console.WriteLine(
"nestedContext dbConnection:"+nestedContext.Database.GetDbConnection().GetHashCode());
Console.WriteLine(
"nestedContext transactionId:"+nextedTrans.TransactionId);
await nestedContext.
AddAsync(new Bar() { Name = "nested bar name" + DateTime.Now.ToString() });
//在事务内部,SaveChangesAsync 内部不会在开启事务,只做 db command 的执行
await nestedContext.SaveChangesAsync();
//提交事务
await nextedTrans.CommitAsync();
}
catch (Exception e) {
//如果需要内存的错误不影响到外层的事务,这里可以不抛出错误
await nextedTrans.RollbackAsync();
//不抛出错误,内层事务不会影响到外侧,抛出错误内层事务失败,外侧跟着一起失败
throw;
}
await _myContext.AddAsync(new Bar() { Name = "bar name" + DateTime.Now.ToString() });
//在事务内部,SaveChangesAsync 内部不会在开启事务,只做 db command 的执行
await _myContext.SaveChangesAsync();
await trans.CommitAsync();
}
catch (Exception e) {
// 使用了 using 语句,不需要在显示的使用 RollbackAsync 方法了
// 在 dispose 里,如果有错误会自动执行 RollbackAsync
//await trans?.RollbackAsync();
}
}
}
uow 模块
public class AbpUnitOfWorkModule : AbpModule
{
public override void PreConfigureServices(ServiceConfigurationContext context)
{
//扩展方法把 action 行为注册给 IObjectAccessor<ServiceRegistrationActionList>
//在使用 autofac 的 useAutofac
//的时候把 ServiceRegistrationActionList 提供的委托执行后生成的拦截器
//绑定给注入的服务,autofac 是 uow 的根本基础
context.Services.OnRegistered(UnitOfWorkInterceptorRegistrar.RegisterIfNeeded);
}
}
public static class UnitOfWorkInterceptorRegistrar
{
public static void RegisterIfNeeded(IOnServiceRegistredContext context)
{
if (ShouldIntercept(context.ImplementationType))
{
//增加 UnitOfWorkInterceptor 拦截器
context.Interceptors.TryAdd<UnitOfWorkInterceptor>();
}
}
private static bool ShouldIntercept(Type type)
{
// 排除不允许被拦截的类
//The Abp framework may enable interceptors for certain components
//(UOW, Auditing, Authorization, etc.), which requires dynamic proxy classes,
// but will cause application performance to decline.
//We need to use other methods for the controller to implement interception,
// such as middleware or MVC / Page filters.
//拦截器用多了会有性能问题, abp 默认会给 controller 添加拦截器,但是可以配置不添加
// 判断 实现类 是否有 UnitOfWorkAttribute 特性 或实现了 IUnitOfWorkEnabled 接口
return !DynamicProxyIgnoreTypes.Contains(type) &&
UnitOfWorkHelper.IsUnitOfWorkType(type.GetTypeInfo());
}
}
UnitOfWorkInterceptor
拦截器要继承 AbpInterceptor-->IAbpInterceptor
public class UnitOfWorkInterceptor : AbpInterceptor, ITransientDependency
{
//使用 IServiceScopeFactory 在 根作用域内生成一个子作用域,这样不受
// httpContext 请求的作用域影响
private readonly IServiceScopeFactory _serviceScopeFactory;
public UnitOfWorkInterceptor(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
}
public override async Task InterceptAsync(IAbpMethodInvocation invocation)
{
//方法是否能被代理
if (!UnitOfWorkHelper.IsUnitOfWorkMethod(invocation.Method,
out var unitOfWorkAttribute))
{
await invocation.ProceedAsync();
return;
}
//开启作用域
using (var scope = _serviceScopeFactory.CreateScope())
{
var options = CreateOptions(scope.ServiceProvider, invocation,
unitOfWorkAttribute);
var unitOfWorkManager = scope.ServiceProvider.
GetRequiredService<IUnitOfWorkManager>();
//Trying to begin a reserved UOW by AbpUnitOfWorkMiddleware
if (unitOfWorkManager.TryBeginReserved(
UnitOfWork.UnitOfWorkReservationName, options))
{
await invocation.ProceedAsync();
if (unitOfWorkManager.Current != null)
{
await unitOfWorkManager.Current.SaveChangesAsync();
}
return;
}
using (var uow = unitOfWorkManager.Begin(options))
{
await invocation.ProceedAsync();
await uow.CompleteAsync();
}
}
}
private AbpUnitOfWorkOptions CreateOptions(IServiceProvider serviceProvider,
IAbpMethodInvocation invocation, [CanBeNull] UnitOfWorkAttribute unitOfWorkAttribute)
{
// AbpUnitOfWorkOptions 工作单元是否是数据库事务性的及相关事务性的属性
// 工作单元可以不是数据库事务性的,因为是数据库事务性的有可能找出数据库的锁行或表
var options = new AbpUnitOfWorkOptions();
unitOfWorkAttribute?.SetOptions(options);
if (unitOfWorkAttribute?.IsTransactional == null)
{
var defaultOptions = serviceProvider.
GetRequiredService<IOptions<AbpUnitOfWorkDefaultOptions>>().Value;
options.IsTransactional = defaultOptions.
CalculateIsTransactional(
autoValue: serviceProvider.
GetRequiredService<IUnitOfWorkTransactionBehaviourProvider>().
IsTransactional
?? !invocation.Method.Name.
StartsWith("Get", StringComparison.InvariantCultureIgnoreCase)
);
}
return options;
}
}
UnitOfWork 及 UnitOfWorkManager
IUnitOfWork
最为核心的 IUnitOfWork ,要拥有 IDatabaseApiContainer, ITransactionApiContainer 的能力
IDatabaseApi(空接口) 实现者分别是 EfCoreDatabaseApi 同时实现 ISupportsSavingChanges
,MongoDbDatabaseApi,MemoryDbDatabaseApi , ITransactionApi(提交事务的方法 CommitAsync)
实现者分别是 EfCoreTransactionApi,MongoDbTransactionApi,它们同时要实现 ISupportsRollback
ISupportsSavingChanges 提交,像 Mongodb 就不支持 SavingChanges
ISupportsRollback 支持了失败了支持回滚操作
IUnitOfWork 继承 IDatabaseApiContainer, ITransactionApiContainer 代表拥有了操作各种数据库 的能力及事务的能力具体能力如何体现要在 Volo.Abp.EntityFrameworkCore ,Volo.Abp.MongoDB; 这 俩个模块里定义
IDatabaseApiContainer 及 ITransactionApiContainer ,如果在一次工作单元里,有多种数据库操作 能力比如 EntityFrameworkCore 和 MongoDB ,需要一个容器来放置
创建工作单元,创建的过程中,可以形成父子关系,并且还能进行保留。保留在数据库事务里就是检查 点,检查点的好处是,在数据库回滚的时候可以指定回滚到哪个检查点,而不是全部取消
public interface IUnitOfWork : IDatabaseApiContainer, ITransactionApiContainer, IDisposable
{
Guid Id { get; }
Dictionary<string, object> Items { get; }
event EventHandler<UnitOfWorkFailedEventArgs> Failed;
event EventHandler<UnitOfWorkEventArgs> Disposed;
IAbpUnitOfWorkOptions Options { get; }
//此工作单元的上级工作单元
IUnitOfWork Outer { get; }
//是否是保留工作单元
bool IsReserved { get; }
bool IsDisposed { get; }
bool IsCompleted { get; }
//保留的每次
string ReservationName { get; }
void SetOuter([CanBeNull] IUnitOfWork outer);
void Initialize([NotNull] AbpUnitOfWorkOptions options);
void Reserve([NotNull] string reservationName);
Task SaveChangesAsync(CancellationToken cancellationToken = default);
Task CompleteAsync(CancellationToken cancellationToken = default);
Task RollbackAsync(CancellationToken cancellationToken = default);
void OnCompleted(Func<Task> handler);
void AddOrReplaceLocalEvent(
UnitOfWorkEventRecord eventRecord,
Predicate<UnitOfWorkEventRecord> replacementSelector = null
);
void AddOrReplaceDistributedEvent(
UnitOfWorkEventRecord eventRecord,
Predicate<UnitOfWorkEventRecord> replacementSelector = null
);
}
public interface IAmbientUnitOfWork : IUnitOfWorkAccessor
{
IUnitOfWork GetCurrentByChecking();
}
// AmbientUnitOfWork 暴漏当前执行的工作单元,及工作单元检查点
[ExposeServices(typeof(IAmbientUnitOfWork), typeof(IUnitOfWorkAccessor))]
public class AmbientUnitOfWork : IAmbientUnitOfWork, ISingletonDependency
{
public IUnitOfWork UnitOfWork => _currentUow.Value;
//在多线程环境下,获取当前的工作单元
private readonly AsyncLocal<IUnitOfWork> _currentUow;
public AmbientUnitOfWork()
{
_currentUow = new AsyncLocal<IUnitOfWork>();
}
public void SetUnitOfWork(IUnitOfWork unitOfWork)
{
_currentUow.Value = unitOfWork;
}
public IUnitOfWork GetCurrentByChecking()
{
var uow = UnitOfWork;
//Skip reserved unit of work
while (uow != null && (uow.IsReserved || uow.IsDisposed || uow.IsCompleted))
{
uow = uow.Outer;
}
return uow;
}
}
工作单元的嵌套关系,及设计原由在 ABP VNext 中,IUnitOfWork 接口是用于管理数据库事务的关键 接口。它定义了一组方法,用于开始、提交和回滚事务。在某些情况下,我们可能需要在一个事务中嵌 套另一个事务。这种嵌套事务的处理是通过使用数据库提供的保存点(Savepoint)机制来实现的。
- 外部工作单元和内部工作单元:外部工作单元是最外层的工作单元,通常代表一个完整的业务操作 或一个 HTTP 请求的范围。内部工作单元是内嵌在外部工作单元中的工作单元,通常代表内部的数 据库操作。
- 嵌套的开始和提交:外部工作单元通常由应用程序的顶级业务逻辑或请求处理器创建,并用于包装 整个业务操作。内部工作单元可以由外部工作单元的代码创建,以表示内部数据库操作。这样可以 将多个数据库操作组合在一起,以便它们在同一个事务中执行。内部工作单元的开始和提交是在外 部工作单元内部显式控制的。
- 嵌套的事务管理:外部工作单元通常开始一个数据库事务,并在外部工作单元的提交时提交该事务 。内部工作单元可以开始一个独立的数据库事务,而不影响外部工作单元的事务。这允许内部工作 单元在不影响外部工作单元的情况下执行自己的数据库操作。内部工作单元的提交操作将提交其内 部事务,但不会提交外部工作单元的事务。外部工作单元的提交操作将同时提交其内部和外部事务 ,以确保所有操作都在同一个事务中生效。
- 异常处理:如果内部工作单元发生异常,它可以选择回滚其内部事务,而不影响外部工作单元的事 务。,如果没有回滚操作就会影响到外部的事务如果外部工作单元发生异常,它会回滚其内部事务以 及外部事务,确保数据库的一致性。
- 用途:嵌套工作单元允许将多个数据库操作组合在一起,以实现更复杂的业务操作,而不会破坏数 据库的一致性。这在处理复杂的业务流程、事务性操作或需要将多个数据库操作原子性地组合在一 起的情况下非常有用。
比如 创建订单、减少库存俩个组件内部都已经写好了事务处理,现在要把它们组合在一起,可以使用 嵌套事务,将这些操作包装在外部事务内,以确保它们要么全部成功,要么全部失败
保留工作单元的设计:
通过 IUnitOfWorkManager.Reserve() 方法生成的工作单元就是保留工作单元,每个保留工作单元都需 要设置一个名称,Reserve 方法内部回向上查是否有同名的保留工作单元,如果有就用同名的之所以设 计保留工作单元,是防止在外部直接提交工作单元,因为通过 IUnitOfWorkManager.Current,不能获取 到保留工作单元工作单元
using (var uow = _unitOfWorkManager.Begin())
{
try
{
// 在外部事务中执行一些操作
using (var nestedUow = _unitOfWorkManager.Begin(TransactionScopeOption.RequiresNew))
{
try
{
// 在嵌套事务中执行一些操作
nestedUow.Complete();
}
catch (Exception ex)
{
// 处理嵌套事务中的异常
throw;
}
}
// 继续在外部事务中执行一些操作
uow.Complete();
}
catch (Exception ex)
{
// 处理外部事务中的异常
throw;
}
}
IUnitOfWorkManager
创建工作单元,创建的过程中,可以形成父子关系,并且还能进行保留。保留在数据库事务里就是检查 点,检查点的好处是,在数据库回滚的时候可以指定回滚到哪个检查点,而不是全部取消
public interface IUnitOfWorkManager
{
[CanBeNull]
IUnitOfWork Current { get; }
//如果 IAmbientUnitOfWork 有工作单元并且不需要新的,直接用当前的
//否则就创建新的,并把当前的作为新的父级,并把新的作为当前的工作单元
[NotNull]
IUnitOfWork Begin([NotNull] AbpUnitOfWorkOptions options, bool requiresNew = false);
//这个方法允许您创建一个工作单元,并为它指定一个保留名称,
//以便稍后通过该名称识别和获取工作单元。
//reservationName 参数表示要为工作单元指定的预留名称,以便稍后使用。
//requiresNew 参数允许您指定是否需要启动一个新的工作单元
//,而不管当前是否已存在具有相同预留名称的工作单元。
//如果 IAmbientUnitOfWork 中的工作单元是保留的,
//并且名称也是 reservationName ,直接使用 IAmbientUnitOfWork 中的
//如果创建新的,会把 IAmbientUnitOfWork 里的工作单元作为新工作单元的父级,
//并把创建的新的工作单元作为 IAmbientUnitOfWork
//创建一个工作单元,不需要新的,就返回旧的。 没有就创建新的,并把新的传递给 IAmbientUnitOfWork
//并对当前的工作单元进行命名保留
[NotNull]
IUnitOfWork Reserve([NotNull] string reservationName, bool requiresNew = false);
// BeginReserved 会顺着 ambientUnitOfWork.UnitOfWork
//一直往外侧找,直到最外侧找到为止,找到后,那此工作单元的预留属性取消
void BeginReserved([NotNull] string reservationName, [NotNull] AbpUnitOfWorkOptions options);
bool TryBeginReserved([NotNull] string reservationName,
[NotNull] AbpUnitOfWorkOptions options);
}
public static class UnitOfWorkHelper
{
// 成为工作单元的类或方法,必须是使用 UnitOfWorkAttribute 特性 或 继承
IUnitOfWorkEnabled 接口的类或对象
// 要么是用户使用 UnitOfWorkManager 自己管理,这个不是如下代码能处理的
public static bool IsUnitOfWorkType(TypeInfo implementationType)
{
// 类 实现了 UnitOfWorkAttribute 特性 或 类的任何方法有 UnitOfWorkAttribute 特性
if (HasUnitOfWorkAttribute(implementationType) ||
AnyMethodHasUnitOfWorkAttribute(implementationType))
{
return true;
}
// implementationType 继承了 IUnitOfWorkEnabled
if (typeof(IUnitOfWorkEnabled).GetTypeInfo().IsAssignableFrom(implementationType))
{
return true;
}
return false;
}
//检查方法或类是否能被代理, UnitOfWorkAttribute ,IUnitOfWorkEnabled,
//并返回 unitOfWorkAttribute
public static bool IsUnitOfWorkMethod([NotNull] MethodInfo methodInfo,
[CanBeNull] out UnitOfWorkAttribute unitOfWorkAttribute)
{
Check.NotNull(methodInfo, nameof(methodInfo));
//方法被标记了 UnitOfWorkAttribute 特性
var attrs = methodInfo.
GetCustomAttributes(true).OfType<UnitOfWorkAttribute>().ToArray();
if (attrs.Any())
{
unitOfWorkAttribute = attrs.First();
return !unitOfWorkAttribute.IsDisabled;
}
// 方法没有被标记 UnitOfWorkAttribute 特性
if (methodInfo.DeclaringType != null)
{
//类被标记了 UnitOfWorkAttribute 特性
attrs = methodInfo.DeclaringType.GetTypeInfo().
GetCustomAttributes(true).OfType<UnitOfWorkAttribute>().ToArray();
if (attrs.Any())
{
unitOfWorkAttribute = attrs.First();
return !unitOfWorkAttribute.IsDisabled;
}
//或者类继承了 IUnitOfWorkEnabled 接口
if (typeof(IUnitOfWorkEnabled).GetTypeInfo().
IsAssignableFrom(methodInfo.DeclaringType))
{
unitOfWorkAttribute = null;
return true;
}
}
unitOfWorkAttribute = null;
return false;
}
public static UnitOfWorkAttribute GetUnitOfWorkAttributeOrNull(MethodInfo methodInfo)
{
var attrs = methodInfo.GetCustomAttributes(true).
OfType<UnitOfWorkAttribute>().ToArray();
if (attrs.Length > 0)
{
return attrs[0];
}
attrs = methodInfo.DeclaringType.GetTypeInfo().
GetCustomAttributes(true).OfType<UnitOfWorkAttribute>().ToArray();
if (attrs.Length > 0)
{
return attrs[0];
}
return null;
}
private static bool AnyMethodHasUnitOfWorkAttribute(TypeInfo implementationType)
{
return implementationType
.GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic)
.Any(HasUnitOfWorkAttribute);
}
private static bool HasUnitOfWorkAttribute(MemberInfo methodInfo)
{
return methodInfo.IsDefined(typeof(UnitOfWorkAttribute), true);
}
}
AbpUnitOfWorkMiddleware 中间件全局的工作单元 保留事务的用于在源代码里
public class AbpUnitOfWorkMiddleware : IMiddleware, ITransientDependency
{
private readonly IUnitOfWorkManager _unitOfWorkManager;
private readonly AbpAspNetCoreUnitOfWorkOptions _options;
public AbpUnitOfWorkMiddleware(
IUnitOfWorkManager unitOfWorkManager,
IOptions<AbpAspNetCoreUnitOfWorkOptions> options)
{
_unitOfWorkManager = unitOfWorkManager;
_options = options.Value;
}
//
public async Task InvokeAsync(HttpContext context, RequestDelegate next)
{
if (IsIgnoredUrl(context))
{
await next(context);
return;
}
//保留的工作单元,这个工作单元通过 工作单元管理器不能获的在外部
using (var uow = _unitOfWorkManager.Reserve(UnitOfWork.UnitOfWorkReservationName))
{
await next(context);
await uow.CompleteAsync(context.RequestAborted);
}
}
private bool IsIgnoredUrl(HttpContext context)
{
return context.Request.Path.Value != null &&
_options.IgnoredUrls.Any(x => context.Request.Path.Value.StartsWith(x));
}
}
public class UnitOfWorkInterceptor : AbpInterceptor, ITransientDependency
{
private readonly IServiceScopeFactory _serviceScopeFactory;
public UnitOfWorkInterceptor(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
}
public override async Task InterceptAsync(IAbpMethodInvocation invocation)
{
if (!UnitOfWorkHelper.IsUnitOfWorkMethod(invocation.Method,
out var unitOfWorkAttribute))
{
await invocation.ProceedAsync();
return;
}
using (var scope = _serviceScopeFactory.CreateScope())
{
var options = CreateOptions(scope.ServiceProvider,
invocation, unitOfWorkAttribute);
var unitOfWorkManager = scope.ServiceProvider.
GetRequiredService<IUnitOfWorkManager>();
//Trying to begin a reserved UOW by AbpUnitOfWorkMiddleware
if (unitOfWorkManager.TryBeginReserved(
UnitOfWork.UnitOfWorkReservationName, options))
{
await invocation.ProceedAsync();
if (unitOfWorkManager.Current != null)
{
await unitOfWorkManager.Current.SaveChangesAsync();
}
return;
}
using (var uow = unitOfWorkManager.Begin(options))
{
await invocation.ProceedAsync();
await uow.CompleteAsync();
}
}
}
private AbpUnitOfWorkOptions CreateOptions(IServiceProvider serviceProvider,
IAbpMethodInvocation invocation, UnitOfWorkAttribute? unitOfWorkAttribute)
{
var options = new AbpUnitOfWorkOptions();
unitOfWorkAttribute?.SetOptions(options);
if (unitOfWorkAttribute?.IsTransactional == null)
{
var defaultOptions = serviceProvider.
GetRequiredService<IOptions<AbpUnitOfWorkDefaultOptions>>().Value;
options.IsTransactional = defaultOptions.CalculateIsTransactional(
autoValue: serviceProvider.
GetRequiredService<IUnitOfWorkTransactionBehaviourProvider>().
IsTransactional
?? !invocation.Method.Name.StartsWith(
"Get", StringComparison.InvariantCultureIgnoreCase)
);
}
return options;
}
}
IDbContextProvider
namespace Volo.Abp.EntityFrameworkCore;
public interface IDbContextProvider<TDbContext>
where TDbContext : IEfCoreDbContext
{
[Obsolete("Use GetDbContextAsync method.")]
TDbContext GetDbContext();
Task<TDbContext> GetDbContextAsync();
}
public class UnitOfWorkDbContextProvider<TDbContext> : IDbContextProvider<TDbContext>
where TDbContext : IEfCoreDbContext
{
private const string TransactionsNotSupportedWarningMessage =
"Current database does not support transactions.
Your database may remain in an inconsistent state in an error case.";
public ILogger<UnitOfWorkDbContextProvider<TDbContext>> Logger { get; set; }
protected readonly IUnitOfWorkManager UnitOfWorkManager;
protected readonly IConnectionStringResolver ConnectionStringResolver;
protected readonly ICancellationTokenProvider CancellationTokenProvider;
protected readonly ICurrentTenant CurrentTenant;
protected readonly IEfCoreDbContextTypeProvider EfCoreDbContextTypeProvider;
public UnitOfWorkDbContextProvider(
IUnitOfWorkManager unitOfWorkManager,
IConnectionStringResolver connectionStringResolver,
ICancellationTokenProvider cancellationTokenProvider,
ICurrentTenant currentTenant,
IEfCoreDbContextTypeProvider efCoreDbContextTypeProvider)
{
UnitOfWorkManager = unitOfWorkManager;
ConnectionStringResolver = connectionStringResolver;
CancellationTokenProvider = cancellationTokenProvider;
CurrentTenant = currentTenant;
EfCoreDbContextTypeProvider = efCoreDbContextTypeProvider;
Logger = NullLogger<UnitOfWorkDbContextProvider<TDbContext>>.Instance;
}
public virtual async Task<TDbContext> GetDbContextAsync()
{
var unitOfWork = UnitOfWorkManager.Current;
if (unitOfWork == null)
{
throw new AbpException("A DbContext can only be created inside a unit of work!");
}
var targetDbContextType =
EfCoreDbContextTypeProvider.GetDbContextType(typeof(TDbContext));
var connectionStringName =
ConnectionStringNameAttribute.GetConnStringName(targetDbContextType);
var connectionString =
await ResolveConnectionStringAsync(connectionStringName);
var dbContextKey = $"{targetDbContextType.FullName}_{connectionString}";
var databaseApi = unitOfWork.FindDatabaseApi(dbContextKey);
if (databaseApi == null)
{
databaseApi = new EfCoreDatabaseApi(
await CreateDbContextAsync(
unitOfWork, connectionStringName, connectionString)
);
unitOfWork.AddDatabaseApi(dbContextKey, databaseApi);
}
return (TDbContext)((EfCoreDatabaseApi)databaseApi).DbContext;
}
protected virtual async Task<TDbContext>
CreateDbContextAsync(IUnitOfWork unitOfWork,
string connectionStringName, string connectionString)
{
var creationContext = new DbContextCreationContext(
connectionStringName, connectionString);
using (DbContextCreationContext.Use(creationContext))
{
var dbContext = await CreateDbContextAsync(unitOfWork);
if (dbContext is IAbpEfCoreDbContext abpEfCoreDbContext)
{
abpEfCoreDbContext.Initialize(
new AbpEfCoreDbContextInitializationContext(
unitOfWork
)
);
}
return dbContext;
}
}
protected virtual async Task<TDbContext> CreateDbContextAsync(IUnitOfWork unitOfWork)
{
return unitOfWork.Options.IsTransactional
? await CreateDbContextWithTransactionAsync(unitOfWork)
: unitOfWork.ServiceProvider.GetRequiredService<TDbContext>();
}
protected virtual async Task<TDbContext>
CreateDbContextWithTransactionAsync(IUnitOfWork unitOfWork)
{
var transactionApiKey = $"EntityFrameworkCore_{
DbContextCreationContext.Current.ConnectionString}";
var activeTransaction = unitOfWork.
FindTransactionApi(transactionApiKey) as EfCoreTransactionApi;
if (activeTransaction == null)
{
var dbContext = unitOfWork.ServiceProvider.GetRequiredService<TDbContext>();
try
{
var dbTransaction = unitOfWork.Options.IsolationLevel.HasValue
? await dbContext.Database.
BeginTransactionAsync(
unitOfWork.Options.IsolationLevel.Value, GetCancellationToken())
: await dbContext.Database.
BeginTransactionAsync(GetCancellationToken());
unitOfWork.AddTransactionApi(
transactionApiKey,
new EfCoreTransactionApi(
dbTransaction,
dbContext,
CancellationTokenProvider
)
);
}
catch (Exception e) when (
e is InvalidOperationException || e is NotSupportedException)
{
Logger.LogWarning(TransactionsNotSupportedWarningMessage);
return dbContext;
}
return dbContext;
}
else
{
DbContextCreationContext.Current.ExistingConnection =
activeTransaction.DbContextTransaction.GetDbTransaction().Connection;
var dbContext = unitOfWork.ServiceProvider.GetRequiredService<TDbContext>();
if (dbContext.As<DbContext>().HasRelationalTransactionManager())
{
if (dbContext.Database.GetDbConnection() ==
DbContextCreationContext.Current.ExistingConnection)
{
await dbContext.Database.UseTransactionAsync(
activeTransaction.DbContextTransaction.GetDbTransaction(),
GetCancellationToken());
}
else
{
try
{
/*User did not re-use the ExistingConnection and
we are starting a new transaction.
EfCoreTransactionApi will check the connection
string match and separately
commit/rollback this transaction over the DbContext instance. */
if (unitOfWork.Options.IsolationLevel.HasValue)
{
await dbContext.Database.BeginTransactionAsync(
unitOfWork.Options.IsolationLevel.Value,
GetCancellationToken()
);
}
else
{
await dbContext.Database.BeginTransactionAsync(
GetCancellationToken()
);
}
}
catch (Exception e) when (e is InvalidOperationException
||e is NotSupportedException)
{
Logger.LogWarning(TransactionsNotSupportedWarningMessage);
return dbContext;
}
}
}
else
{
try
{
/* No need to store the returning IDbContextTransaction
for non-relational databases
* since EfCoreTransactionApi will handle the
commit/rollback over the DbContext instance.
*/
await dbContext.Database.BeginTransactionAsync(GetCancellationToken());
}
catch (Exception e) when (e is InvalidOperationException
|| e is NotSupportedException)
{
Logger.LogWarning(TransactionsNotSupportedWarningMessage);
return dbContext;
}
}
activeTransaction.AttendedDbContexts.Add(dbContext);
return dbContext;
}
}
protected virtual async Task<string> ResolveConnectionStringAsync(
string connectionStringName)
{
// Multi-tenancy unaware contexts should always use the host connection string
if (typeof(TDbContext).IsDefined(typeof(IgnoreMultiTenancyAttribute), false))
{
using (CurrentTenant.Change(null))
{
return await ConnectionStringResolver.ResolveAsync(connectionStringName);
}
}
return await ConnectionStringResolver.ResolveAsync(connectionStringName);
}
}