利用AOP寫2PC框架(二),aop寫2pc框架
AOP的底層已經封裝好了以後,我們就要開始針對應用層寫具體的業務邏輯了。
也就是說我們需要有個類繼承於AopProxyBase,並且重寫其After,Bofore以達到我們的攔截記錄的功能。代碼如下:

![]()
public class TransactionProxy : AopProxyBase
{
public TransactionProxy(MarshalByRefObject obj, Type type)
: base(obj, type)
{ }
public override void Before(System.Runtime.Remoting.Messaging.IMessage requestMsg, AopMethodAttribute[] attrs)
{
}
public override void After(System.Runtime.Remoting.Messaging.IMessage requestMsg, System.Runtime.Remoting.Messaging.IMessage Respond, AopMethodAttribute[] attrs)
{
foreach (var attr in attrs)
{
if (attr is LogicRollBackTransAttribute)
{
return;
}
}
var args = requestMsg.Properties["__Args"] as object[];
string methodName = requestMsg.Properties["__MethodName"] as string;
CustomTransaction customTrans = null;
List<object> list = new List<object>();
customTrans = CallContext.GetData(TransKey.CustomTransKey) as CustomTransaction;
if (customTrans != null)
{
list.AddRange(args);
TransactionUnit unit = AppTransactionManage.Instance.GetRollBackInfo(methodName);
if (unit != null)
{
unit.Argments = list;
}
customTrans.Compensation.Add(unit);
CallContext.SetData(TransKey.CustomTransKey, customTrans);
var outArgs = Respond.Properties["__OutArgs"] as object[];
IDbTransaction dbTrans;
foreach (var attr in attrs)
{
if (attr is DbTransAttribute || attr is LogicTransAttribute)
{
if (outArgs != null)
{
foreach (var arg in outArgs)
{
if (arg is IDbTransaction)
{
dbTrans = arg as IDbTransaction;
if (customTrans != null)
{
customTrans.AddDbTransaction(dbTrans);
}
}
}
}
}
}
}
}
}
View Code
在After的地方,我們可以看到,我們做了一次LogicRollBackTransAttribute的判定,避免在回調的時候,又再走一次攔截和記錄的流程。
同時做了DbTransAttribute和LogicTransAttribute的判定。因為我把事務分為兩類,一類是db本身自己控制的,可以直接rollback的,一類是logic的,需要我們去手動通過邏輯回滾的。代碼如下:

![]()
[AttributeUsage(AttributeTargets.Method)]
public class LogicTransAttribute : AopMethodAttribute
{
public string MethodName { get; set; }
public LogicTransAttribute()
{
}
public LogicTransAttribute(string name)
{
this.MethodName = name;
}
}
[AttributeUsage(AttributeTargets.Method)]
public class DbTransAttribute : AopMethodAttribute
{
}
View Code
同時可以看到,我把每一個函數的調用作為一個單元,用TransactionUnit類來保存,代碼如下:

![]()
public class TransactionUnit
{
public object InstanceObject;
/// <summary>
/// 執行的方法
/// </summary>
public MethodInfo Forward;
/// <summary>
/// 失敗回滾的方法
/// </summary>
public MethodInfo Rollback;
/// <summary>
/// 參數
/// </summary>
public IList<object> Argments;
}
View Code
因為,一個事務裡面,可能包含了多次操作redis,或者多次操作db,為了保證線程安全,同時又需要避開鎖,我用了CallContext將一個線程裡面的一段事務,保存在其線程上下文中。在保存一個完整的TransactionUnit的時候,不可能每一次都去通過反射去取MethodInfo,所以又增加了一段初始化和字典來保存其MethodInfo。代碼如下:

![]()
public class AppTransactionManage
{
private Dictionary<string, TransactionUnit> _transMaps;
static AppTransactionManage() { }
private AppTransactionManage()
{
if (this._transMaps == null)
{
this._transMaps = new Dictionary<string, TransactionUnit>();
}
}
private static AppTransactionManage _instance;
public static AppTransactionManage Instance
{
get
{
if (_instance == null)
{
_instance = new AppTransactionManage();
}
return _instance;
}
}
public TransactionUnit GetRollBackInfo(string methodName)
{
if (this._transMaps == null) throw new ArgumentNullException("not init");
if (this._transMaps.ContainsKey(methodName))
{
return this._transMaps[methodName];
}
return null;
}
public void Init(params string[] assembly)
{
if (assembly != null)
{
foreach (string s in assembly)
{
var ass = Assembly.Load(s);
if (ass != null)
{
var types = ass.GetTypes();
foreach (var type in types)
{
var transAttr = type.GetCustomAttribute(typeof(TransactionAttribute), false) as TransactionAttribute;
if (transAttr != null)
{
var methods = type.GetMethods();
foreach (var method in methods)
{
var forwardTrans = method.GetCustomAttribute(typeof(LogicTransAttribute), false) as LogicTransAttribute;
var rollbackTrans = method.GetCustomAttribute(typeof(LogicRollBackTransAttribute), false) as LogicRollBackTransAttribute;
TransactionUnit unit;
if (forwardTrans != null)
{
if (!this._transMaps.TryGetValue(forwardTrans.MethodName, out unit))
{
unit = new TransactionUnit();
}
unit.Forward = method;
unit.InstanceObject = Activator.CreateInstance(type);
this._transMaps[forwardTrans.MethodName] = unit;
}
if (rollbackTrans != null)
{
if (!this._transMaps.TryGetValue(rollbackTrans.MethodName, out unit))
{
unit = new TransactionUnit();
}
unit.Rollback = method;
unit.InstanceObject = Activator.CreateInstance(type);
this._transMaps[rollbackTrans.MethodName] = unit;
}
}
}
}
}
}
}
}
}
View Code
為了友好開發者的調用,可以讓其像使用SqlTransaction一樣來使用,我又對外公開了一個CustomTranstion,將調用方式封裝在這個類裡面,代碼如下:

![]()
public class CustomTransaction : IDisposable
{
private List<IDbTransaction> _dbTransactions;
private bool _isRollBack = true;
/// <summary>
/// 補償機制
/// </summary>
public List<TransactionUnit> Compensation;
public void Commit()
{
if (this._dbTransactions != null)
{
this._dbTransactions.ForEach((t) => t.Commit());
}
this._isRollBack = false;
}
public void RollBack()
{
if (this.Compensation != null)
{
this.Compensation.ForEach((t) =>
{
object[] paramsArray = t.Argments == null ? null : t.Argments.ToArray();
t.Rollback.Invoke(t.InstanceObject, paramsArray);
});
}
if (this._dbTransactions != null)
{
this._dbTransactions.ForEach((t) => t.Rollback());
}
}
private bool _isRetry = true;
public CustomTransaction(bool isRetry = true)
{
this._isRetry = isRetry;
if (this._dbTransactions == null)
{
this._dbTransactions = new List<IDbTransaction>();
}
if (this.Compensation == null)
{
this.Compensation = new List<TransactionUnit>();
}
CallContext.SetData(TransKey.CustomTransKey, this);
}
public void AddDbTransaction(IDbTransaction transaction)
{
this._dbTransactions.Add(transaction);
}
public void Dispose()
{
if (this._isRollBack)
{
this.RollBack();
}
CallContext.FreeNamedDataSlot(TransKey.CustomTransKey);
}
}
View Code
這個時候,你就可以像是用SqlTransaction一樣去Using(var trans = new CustomTranstion()){}然後在using裡面去寫trans.Commit();來提交所有的事務操作,如果不做Commit操作的話,在CustomTranstion裡面,會自動去調用其rollback()操作。
但是這並沒有完,所有的只是記錄下來了,但是並沒有保存到DB去做持久化。這個時候就需要增加一個隊列,來不斷的去將TransactionUnit來保存到db,同時又需要把隊列去做持久化,避免一些意外原因,導致隊列數據丟失,而缺失了這部分的日志記錄(雖然我個人認為這一部分可以省略)。代碼如下:

![]()
[Serializable]
public class TransQueue : IDisposable
{
public Queue<Action> _transQueue;
private Thread _thread;
private bool _isDispose;
public delegate void PersistenceHandler(Action[] actions);
PersistenceHandler persistenceHandler;
private readonly object _syncObject = new object();
public TransQueue()
{
if (_transQueue == null)
{
_transQueue = new Queue<Action>();
}
if (persistenceHandler == null)
{
persistenceHandler = PersistenceToDisk;
}
if (_thread == null)
{
_thread = new Thread(Thread_Work)
{
IsBackground = true
};
}
_thread.Start();
}
public void Push(Action action)
{
if (_transQueue == null) throw new ArgumentNullException("transQueue is not init");
lock (_syncObject)
{
_transQueue.Enqueue(action);
}
}
public void Thread_Work()
{
while (!_isDispose)
{
Action[] items = null;
if (_transQueue != null && _transQueue.Count > 0)
{
lock (_syncObject)
{
items = new Action[_transQueue.Count];
_transQueue.CopyTo(items, 0);
}
}
if (items != null && items.Length > 0)
{
persistenceHandler.BeginInvoke(items, PersistenceHandlerCallBack, persistenceHandler);
foreach (var item in items)
{
item.Invoke();
}
}
}
}
public void PersistenceHandlerCallBack(IAsyncResult result)
{
try
{
(result.AsyncState as PersistenceHandler).EndInvoke(result);
}
catch (Exception e)
{
}
}
public void PersistenceToDisk(Action[] items)
{
BinaryHelper.SaveToFile(items);
}
public void Dispose()
{
_isDispose = true;
_thread.Join();
}
}
public class TransQueueManage
{
private int _threadNumber = 2;
private TransQueue[] _transQueue;
Random random = new Random();
private TransQueueManage()
{
if(_transQueue == null)
{
_transQueue = new TransQueue[_threadNumber];
for (var i = 0; i < _threadNumber; i++)
{
_transQueue[i] = new TransQueue();
}
}
}
static TransQueueManage()
{
}
private static readonly object _syncObject = new object();
private static TransQueueManage _instance;
public static TransQueueManage Instance
{
get
{
if (_instance == null)
{
lock (_syncObject)
{
if (_instance == null)
{
_instance = new TransQueueManage();
}
}
}
return _instance;
}
}
public void Push(Action action)
{
var index = GetRandomThreadIndex();
_transQueue[index].Push(action);
}
public int GetRandomThreadIndex()
{
return random.Next(0, _threadNumber);
}
}
View Code
簡述Spring框架中IOC與AOP
IoC就是對象的創建,依賴都由Spring及配置文件控制
AOP就是統一的給一些類似的方法加上同樣的功能,比如日志,事務
利用Spring的AOP實現權限控制
用java動態代理就可以搞定了
建議好好看看代理模式