Enterprise Library深入解析與靈活應用(2):通過SqlDependency實現Cache和Database的同步
對於一個真正的企業級的應用來說,Caching肯定是一個不得不考慮的因素,合理、有效地利用Caching對於增強應用的Performance(減少 對基於Persistent storage的IO操作)、Scalability(將數據進行緩存,減輕了對Database等資源的壓力)和Availability(將數據進行緩存 ,可以應對一定時間內的網絡問題、Web Service不可訪問問題、Database的崩潰問題等等)。Enterprise Library的Caching Application Block為我們提供了一個易用的、可擴展的實現Caching的框架。借助於Caching Application Block,Administrator和Developer很容易實現基 於Caching的管理和編程。由於Caching的本質在於將相對穩定的數據常駐內存,以避免對Persistent storage的IO操作的IO操作,所以有兩個 棘手的問題:Load Balance問題;Persistent storage和內存中數據同步的問題。本篇文章提供了一個解決方案通過SqlDependency實現SQL Server中的數據和Cache同步的問題。
一、Cache Item的過期策略
在默認的情況下,通過CAB(以下對Caching Application Block的簡稱,注意不是Composite UI Application Block )的CacheManager加入的cache item是永不過期的;但是CacheManager允許你在添 加cache item的時候通過一個ICacheItemExpiration對象應用不同的過期策略。CAB定了一個以下一些class實現了ICacheItemExpiration,以 提供不同的過期方式:
AbsoluteTime:為cache item設置一個cache item的絕對過期時間。
ExtendedFormatTime:通過一個表 達式實現這樣的過期策略:每分鐘過期(*****:5個*分別代表minute、hour、date、month、year);每個小時的第5分鐘過期(5****);每 個月的2號零點零分過期(0 0 2 * *)。
FileDependency:將cache item和一個file進行綁定,通過檢測file的最後更新時間確定file 自cache item被添加以來是否進行過更新。如果file已經更新則cache item過期。
NeverExpired:永不過期。
SlidingTime:一 個滑動的時間,cache item的每次獲取都將生命周期延長到設定的時間端,當cache item最後一次獲取的時間算起,超出設定的時間,則cache item過期。
對於過期的cache item,會及時地被清理。所以要實現我們開篇提出的要求:實現Sql Server中的數據和Cache中的數據實 現同步,我們可以通過創建基於Sql Server數據變化的cache item的過期策略。換句話說,和FileDependency,當Persistent storage (Database)的數據變化本檢測到之後,對於得cache自動過期。但是,對於文件的修改和刪除,我們和容易通過文件的最後更新日期或者是否 存在來確定。對於Database中Table數據的變化的探測就不是那麼簡單了。不過SQL Server提供了一個SqlDependency的組建幫助我們很容易地 實現了這樣的功能。
二、創建基於SqlDependency的ICacheItemExpiration
SqlDependency是建立在SQL Server 2005的Service Broker之上。SqlDependency向SQL Server訂閱一個Query Notification。當SQL Server檢測到基於該Query的數據發生變化,向SqlDependency 發送一個Notification,並觸發SqlDependency的Changed事件,我們就可以通過改事件判斷對應的cache item是否應該過期。
我們現在 就來創建這樣的一個ICacheItemExpiration。我們先看看ICacheItemExpiration的的定義:
public interface ICacheItemExpiratioN
{
// Methods
bool HasExpired();
void Initialize(CacheItem owningCacheItem);
void Notify();
}
而判斷過期的依據就是根據HasExpired方法,我們自定義的 CacheItemExpiration就是實現了該方法,根據SqlDependency判斷cache item是否過期。下面是SqlDependencyExpiration的定義(注: SqlDependencyExpiration的實現通過Enterprise Library DAAB實現DA操作):
namespace Artech.SqlDependencyCaching
{
public class SqlDependencyExpiration : ICacheItemExpiratioN
{
private static readonly CommandType DefaultComamndType = CommandType.StoredProcedure;
public event EventHandler Expired;
public bool HasChanged
{ get; set; }
public string ConnectionName
{ get; set; }
public SqlDependencyExpiration(string commandText, IDictionary<string, object> parameters) :
this(commandText, DefaultComamndType, string.Empty, parameters)
{ }
public SqlDependencyExpiration (string commandText, string connectionStringName, IDictionary<string, object> parameters) :
this (commandText, DefaultComamndType, connectionStringName, parameters)
{ }
public SqlDependencyExpiration(string commandText, CommandType commandType, IDictionary<string, object> parameters) :
this(commandText, commandType, string.Empty, parameters)
{ }
public SqlDependencyExpiration(string commandText, CommandType commandType, string connectionStringName, IDictionary<string, object> parameters)
{
if (string.IsNullOrEmpty(connectionStringName))
{
this.ConnectionName = DatabaseSettings.GetDatabaseSettings(ConfigurationSourceFactory.Create ()).DefaultDatabase;
}
else
{
this.ConnectionName = connectionStringName;
}
SqlDependency.Start(ConfigurationManager.ConnectionStrings [this.ConnectionName].ConnectionString);
using (SqlConnection sqlConnection = DatabaseFactory.CreateDatabase(this.ConnectionName).CreateConnection() as SqlConnection)
{
SqlCommand command = new SqlCommand(commandText, sqlConnection);
command.CommandType = commandType;
if (parameters != null)
{
this.AddParameters (command, parameters);
}
SqlDependency dependency = new SqlDependency(command);
dependency.OnChange += delegate
{
this.HasChanged = true;
if (this.Expired != null)
{
this.Expired(this, new EventArgs());
}
};
if (sqlConnection.State != ConnectionState.Open)
{
sqlConnection.Open();
}
command.ExecuteNonQuery();
}
}
private void AddParameters(SqlCommand command, IDictionary<string, object> parameters)
{
command.Parameters.Clear();
foreach (var parameter in parameters)
{
string parameterName = parameter.Key;
if (!parameter.Key.StartsWith("@"))
{
parameterName = "@" + parameterName;
}
command.Parameters.Add(new SqlParameter(parameterName, parameter.Value));
}
}
#region ICacheItemExpiration Members
public bool HasExpired()
{
bool indicator = this.HasChanged;
this.HasChanged = false;
return indicator;
}
public void Initialize(CacheItem owningCacheItem)
{ }
public void Notify()
{ }
#endregioN
}
}
我們來簡單分析一下實現過程,先看看Property定義:
private static readonly CommandType DefaultComamndType = CommandType.StoredProcedure;
public event EventHandler Expired;
public bool HasChanged
{ get; set; }
public string ConnectionName
{ get; set; }
通過DefaultComamndType 定義了默認的CommandType,在這了我默認使用Stored Procedure;Expired event將在cache item過期時觸發;HasChanged代表Database的數 據是否被更新,將作為cache過期的依據;ConnectionName代表的是Connection string的名稱。
為了使用上的方便,我定義了4個重載 的構造函數,最後的實現定義在public SqlDependencyExpiration(string commandText, CommandType commandType, string connectionStringName, IDictionary<string, object> parameters)。parameters代表commandText的參數列表,key為參數名稱, value為參數的值。首先獲得真正的connection string name(如果參數connectionStringName為空,就使用DAAB默認的connection string)
if (string.IsNullOrEmpty(connectionStringName))
{
this.ConnectionName = DatabaseSettings.GetDatabaseSettings(ConfigurationSourceFactory.Create()).DefaultDatabase;
}
else
{
this.ConnectionName = connectionStringName;
}
然後通過調用SqlDependency.Start()方法,並傳入connection string 作為參數。該方法將創建一個Listener用於監聽connection string代表的database instance發送過來的query notifucation。
SqlDependency.Start(ConfigurationManager.ConnectionStrings[this.ConnectionName].ConnectionString);
然後創建 SqlConnection,並根據CommandText和CommandType參數創建SqlCommand對象,並將參數加入到command的參數列表中。最後將這個SqlCommand 對象作為參數創建SqlDependency 對象,並注冊該對象的OnChange 事件(對HasChanged 賦值;並觸發Expired事件)。這樣當我們執行該 Cmmand之後,當基於commandtext的select sql語句獲取的數據在database中發生變化(添加、更新和刪除),SqlDependency 的OnChange 將 會觸發
SqlDependency dependency = new SqlDependency(command);
dependency.OnChange += delegate
{
this.HasChanged = true;
if (this.Expired != null)
{
this.Expired(this, new EventArgs());
}
};
這樣在HasExpired方法中,就可以根據HasChanged 屬性判斷cache item是否應該過期 了。
public bool HasExpired()
{
bool indicator = this.HasChanged;
this.HasChanged = false;
return indicator;
}
三、如何應用SqlDependencyExpiration
我們現在創建一個簡單的Windows Application來模擬使用我們創建的SqlDependencyExpiration。我們模擬一個簡單的場景:假設我們有一個功能需要向系統所有的user發送通 知,而且不同的user,通知是不一樣的,由於通知的更新的頻率不是很高,我們需要講某個User的通知進行緩存。
這是我們的表結構: Messages
我們通過下面的SP來獲取基於某個 User 的Message:
ALTER PROCEDURE [dbo].[Message_Select_By_User]
(@UserID VarChar(50))
AS
BEGIN
Select ID, UserID, [Message] From dbo.Messages Where UserID = @UserID
END
注:如何寫成 Select * From dbo.Messages Where UserID = @UserID, SqlDependency 將不能正常運行;同
時Table的schema(dbo)也是必須的。
我們設計如下的界面來模擬:通過Add按鈕,可以為選擇的User創建新的Message,而下面的List將顯示基於某個User(Foo)的Message List。該列表的獲取方式基於Lazy Loading的方式,如果在Cache中,則直接從Cache中獲取,否則從Db中獲取,並將獲取的數據加入cache。
我們先定義了3個常量,分別表示:緩存 message針對的User,獲取Message list的stored procedure名稱和Cache item的key。
private const string UserName = "Foo";
private const string MessageCachingProcedure = "Message_Select_By_User";
private const string CacheKey = "__MessageOfFoo";
我們通過一個Property來創建或獲取我們的上面定義的 SqlDependencyExpiration 對象
private SqlDependencyExpiration CacheItemExpiratioN
{
get
{
IDictionary<string, object> parameters = new Dictionary<string, object>();
parameters.Add("UserID", UserName);
SqlDependencyExpiration expiration= new SqlDependencyExpiration (MessageCachingProcedure, parameters);
expiration.Expired += delegate
{
MessageBox.Show("Cache has expired!");
};
return expiration;
}
}
通過GetMessageByUser從數據庫中獲取基於某個User的Message List(使用了DAAB):
private List<string> GetMessageByUser(string userName)
{
List<string> messageList = new List<string> ();
Database db = DatabaseFactory.CreateDatabase();
DbCommand command = db.GetStoredProcCommand (MessageCachingProcedure);
db.AddInParameter(command, "UserID", DbType.String, userName);
IDataReader reader = db.ExecuteReader(command);
while (reader.Read())
{
messageList.Add (reader.GetString(2));
}
return messageList;
}
通過GetMessages獲取User(Foo)的Message List:首先通過CacheManager檢測message list是否存在於Cache,如何不存在,調用上面的GetMessageByUser方法從database中獲取Foo的 message list。並將其加入Cache中,需要注意的是這裡使用到了我們的SqlDependencyExpiration 對象。
private List<string> GetMessages()
{
ICacheManager manager = CacheFactory.GetCacheManager();
if (manager.GetData(CacheKey) == null)
{
manager.Add(CacheKey, GetMessageByUser(UserName), CacheItemPriority.Normal, null, this.CacheItemExpiration);
}
return manager.GetData(CacheKey) as List<string>;
}
由於在我們的例子中需要對DB進行數據操作,來檢測數據的變換是否應用Cache的過期,我們需要 想數據庫中添加Message。我們通過下面的方式現在message的添加。
private void CreateMessageEntry(string userName, string message)
{
Database db = DatabaseFactory.CreateDatabase();
string insertSql = "INSERT INTO [dbo].[Messages]([UserID],[Message])VALUES(@userID, @message)";
DbCommand command = db.GetSqlStringCommand (insertSql);
db.AddInParameter(command, "userID", DbType.String, userName);
db.AddInParameter (command, "message", DbType.String, message);
db.ExecuteNonQuery(command);
}
我們的Add按 鈕的實現如下:基於我們選擇的Username和輸入的message的內容向DB中添加Message,然後調用GetMessages()方法獲取基於用戶Foo的Message 列表。之所以要在兩者之間將線程休眠1s,是為了上SqlDependency有足夠的時間結果從Database傳過來的Query Notification,並觸發 OnChanged事件並執行相應的Event Handler,這樣調用GetMessages時檢測Cache才能檢測到cache item已經過期了。
private void buttonAdd_Click(object sender, EventArgs e)
{
this.CreateMessageEntry (this.comboBoxUserName.SelectedValue.ToString(), this.textBoxMessage.Text.Trim());
Thread.Sleep(1000);
this.listBoxMessage.DataSource = this.GetMessages();
}
由於我們緩存了用戶Foo的Message list,所以當我們為Foo 創建Message的時候,下面的ListBox的列表能夠及時更新,這表明我們的cache item已經過期了。而我們為其他的用戶(Bar,Baz)創建 Message的時候,cache item將不會過期,這一點我們可以通過彈出的MessageBox探測掉(expiration.Expired += delegate MessageBox.Show("Cache has expired!");}; ),只有前者才會彈出下面的MessageBox:
注:由於SqlDependency建立在Service Broker之上的,所 以我們必須將service Broker開關打開(默認使關閉的)。否則我們將出現下面的錯誤:
打開service Broker可以通過如下的T-SQL:ALTER DATABASE MyDb SET ENABLE_BROKER ;
本文配套源碼