SQL Server 2008 引入了更改跟蹤,這是一種輕量型解決方案,它為應用程序提供了一種有效的更改跟蹤機制。通常,若要使應用程序能夠查詢對數據庫中的數據所做的更改和訪問與這些更改相關的信息,應用程序開發人員必須實現自定義更改跟蹤機制。創建這些機制通常涉及多項工作,並且常常涉及使用觸發器、timestamp 列和新表組合來存儲跟蹤信息,同時還會涉及使用自定義清除過程。
通過更改跟蹤,可以很容易地編寫同步數據的應用,下面是一個使用更改跟蹤實現單向數據同步的示例。
1. 建立示例環境
-- ====================================================
-- 測試的數據庫
USE master;
GO
CREATE DATABASE DB_test;
GO
-- 啟用更改跟蹤
ALTER DATABASE DB_test SET
CHANGE_TRACKING = ON
(
AUTO_CLEANUP = ON, -- 打開自動清理選項
CHANGE_RETENTION = 1 HOURS -- 數據保存期為1 時
);
ALTER DATABASE DB_test SET
ALLOW_SNAPSHOT_ISOLATION ON; -- 允許在測試數據庫中使用 SNAPSHOT 事務隔離級別
GO
-- ====================================================
-- 測試的表
USE DB_test;
GO
-- a. 同步的源表
CREATE TABLE dbo.tb_source
(
pk_id int IDENTITY
PRIMARY KEY,
col1 int,
col2 varchar(10),
col3 nvarchar(max),
col4 xml
);
GO
-- 啟用更改跟蹤
ALTER TABLE dbo.tb_source
ENABLE CHANGE_TRACKING
WITH
(
TRACK_COLUMNS_UPDATED = ON -- 記錄UPDATE 的列信息
);
GO
-- b. 同步的目錄表
CREATE TABLE dbo.tb_Target
(
pk_id int
PRIMARY KEY,
col1 int,
col2 varchar(10),
col3 nvarchar(max),
col4 xml
);
GO
-- 記錄同步情況的表
CREATE TABLE dbo.tb_Change_Tracking
(
id int IDENTITY
PRIMARY KEY,
object_name sysname UNIQUE,
last_sync_version bigint,
last_update_date datetime
);
GO
2. 實現同步處理的存儲過程
-- ====================================================
-- 數據同步處理的存儲過程
USE DB_test;
GO
-- 數據同步的存儲過程- 同步未更新的數據
-- 單次更新,更新完成後退出
CREATE PROC dbo.p_SyncChangeData_tb_Srouce_Target
@last_sync_version bigint = NULL OUTPUT,
@min_valid_version bigint = NULL OUTPUT
AS
SET NOCOUNT ON;
-- ========================================
-- TRY...CATCH 中的標准事務處理模塊
-- a. 當前的事務數
DECLARE @__trancount int;
SELECT @__trancount = @@TRANCOUNT;
-- TRY...CATCH 處理
BEGIN TRY
-- ========================================
-- 源表信息
DECLARE @object_name sysname,@object_id int;
SELECT @object_name = N'dbo.tb_source',
@object_id = OBJECT_ID(@object_name);
-- ========================================
-- 最後一次同步的版本
IF @last_sync_version IS NULL
BEGIN
SELECT
@last_sync_version = last_sync_version
FROM dbo.tb_Change_Tracking
WHERE object_name = @object_name;
IF @@ROWCOUNT = 0
BEGIN
SET @last_sync_version = CHANGE_TRACKING_MIN_VALID_VERSION(@object_id);
INSERT dbo.tb_Change_Tracking
(
object_name,
last_sync_version
)
VALUES
(
@object_name,
@last_sync_version
);
END;
END;
-- ========================================
-- TRY...CATCH 中的標准事務處理模塊
-- b. 開啟事務, 或者設置事務保存點
SET TRANSACTION ISOLATION LEVEL SNAPSHOT; -- 使用快照隔離級別的事務
IF @__trancount = 0
BEGIN TRAN;
ELSE
SAVE TRAN __TRAN_SavePoint;
-- ========================================
-- 版本驗證
-- a. 驗證是否有數據變更(如果上次同步的版本號= 當前數據庫的最大版本號,則視為無數據變化)
IF @last_sync_version = CHANGE_TRACKING_CURRENT_VERSION()
GOTO lb_Return;
-- b. 驗證同步的版本號是否有效(如果上次同步的版本號< 當前可用的最小版本號,則視為無效)
IF @last_sync_version < CHANGE_TRACKING_MIN_VALID_VERSION(@object_id)
BEGIN
SET @min_valid_version = CHANGE_TRACKING_MIN_VALID_VERSION(@object_id);
GOTO lb_Return;
END;
-- c. 驗證同步的版本號是否有效(如果上次同步的版本號> 當前數據庫的最大版本號,則視為無效)
IF @last_sync_version > CHANGE_TRACKING_CURRENT_VERSION()
BEGIN
SET @last_sync_version = NULL;
GOTO lb_Return;
END;
-- ========================================
-- 同步數據
-- a. 插入
WITH
CHG AS
(
SELECT DATA.*
FROM dbo.tb_source DATA
INNER JOIN
CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG
ON CHG.pk_id = DATA.pk_id
WHERE CHG.SYS_CHANGE_OPERATION = N'I'
)
INSERT dbo.tb_Target
SELECT * FROM CHG;
-- b. 刪除
WITH
CHG AS
(
SELECT CHG.*
FROM CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG
WHERE CHG.SYS_CHANGE_OPERATION = N'D'
)
DELETE DATA
FROM dbo.tb_Target DATA
INNER JOIN CHG
ON CHG.pk_id = DATA.pk_id;
-- c. 更新
WITH
COL AS
(
SELECT *
FROM
(
SELECT name, column_id
FROM sys.columns
WHERE object_id = @object_id
) DATA
PIVOT
(
MAX(column_id)
FOR name IN( -- 源表的所有列的列表
[col1], [col2], [col3], [col4])
)P
),
CHG AS
(
SELECT DATA.*,__SYS_CHANGE_COLUMNS = CHG.SYS_CHANGE_COLUMNS
FROM dbo.tb_source DATA INNER JOIN
CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG
ON CHG.pk_id = DATA.pk_id
WHERE CHG.SYS_CHANGE_OPERATION = N'U'
)
UPDATE DATA SET -- 判斷列是否需要更新(也可以不判斷,直接更新所有的列)
[col1] = CASE
WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col1], CHG.__SYS_CHANGE_COLUMNS) = 1
THEN CHG.[col1]
ELSE DATA.[col1]
END,
[col2] = CASE
WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col2], CHG.__SYS_CHANGE_COLUMNS) = 1
THEN CHG.[col2]
ELSE DATA.[col2]
END,
[col3] = CASE
WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col3], CHG.__SYS_CHANGE_COLUMNS) = 1
THEN CHG.[col3]
ELSE DATA.[col3]
END,
[col4] = CASE
WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col4], CHG.__SYS_CHANGE_COLUMNS) = 1
THEN CHG.[col4]
ELSE DATA.[col4]
END
FROM dbo.tb_Target DATA
INNER JOIN CHG
ON CHG.pk_id = DATA.pk_id
CROSS JOIN COL;
-- ========================================
-- 更新最後一次同步的版本號
UPDATE dbo.tb_Change_Tracking SET
@last_sync_version = CHANGE_TRACKING_CURRENT_VERSION(),
last_sync_version = @last_sync_version,
last_update_date = GETDATE();
-- ========================================
-- TRY...CATCH 中的標准事務處理模塊
-- c. 提交事務
-- 有可提交的事務, 並且事務是在當前模塊中開啟的情況下, 才提交事務
IF XACT_STATE() = 1 AND @__trancount = 0
COMMIT;
lb_Return:
-- ========================================
-- TRY...CATCH 中的標准事務處理模塊
-- d. 為了防止TRY 塊中有遺漏的事務處理, 在TRY 模塊的結束部分做最終的事務處理
IF @__trancount = 0
BEGIN
IF XACT_STATE() = -1
ROLLBACK TRAN;
ELSE
BEGIN
WHILE @@TRANCOUNT > 0
COMMIT TRAN;
END;
END;
END TRY
BEGIN CATCH
-- ========================================
-- TRY...CATCH 中的標准事務處理模塊
-- e. CATCH 塊的事務回滾
IF XACT_STATE() <> 0
BEGIN
IF @__trancount = 0
ROLLBACK TRAN;
-- XACT_STATE 為-1 時, 不能回滾到事務保存點, 這種情況留給外層調用者做統一的事務回滾
-- 通過@@TRANCOUNT > @__trancount 的判斷, 即使在TRY 模塊中沒有設置事務保存點的情況下跳到此步驟, 也不會出錯
ELSE IF XACT_STATE() = 1 AND @@TRANCOUNT > @__trancount
ROLLBACK TRAN __TRAN_SavePoint;
END;
-- ========================================
-- 錯誤消息處理
-- a. 獲取錯誤信息
-- 這提提取了錯誤相關的全部信息, 可以根據實際需要調整
DECLARE @__error_number int,
@__error_message nvarchar(2048),
@__error_severity int,
@__error_state int,
@__error_line int,
@__error_procedure nvarchar(126),
@__user_name nvarchar(128),
@__host_name nvarchar(128);
SELECT @__error_number = ERROR_NUMBER(),
@__error_message = ERROR_MESSAGE(),
@__error_severity = ERROR_SEVERITY(),
@__error_state = ERROR_STATE(),
@__error_line = ERROR_LINE(),
@__error_procedure = ERROR_PROCEDURE(),
@__user_name = SUSER_SNAME(),
@__host_name = HOST_NAME();
-- c. 如果沒有打算在CATCH 模塊中對錯誤進行處理, 則應該拋出錯誤給調用者
RAISERROR
(
N'User: %s, Host: %s, Procedure: %s, Error %d, Level %d, State %d, Line %d, Message: %s ',
@__error_severity,
1,
@__user_name,
@__host_name,
@__error_procedure,
@__error_number,
@__error_severity,
@__error_state,
@__error_line,
@__error_message
);
END CATCH;
GO
-- 數據同步的存儲過程- 同步未更新的數據
-- 循環更新,直到手工結束
-- 可設置一個在SQL Agent服務啟動時工作的JOB 來調用這個存儲過程,這樣就可以實現自動同步
CREATE PROC dbo.p_SyncChangeData_Circle_tb_Srouce_Target
@interval int = 1 -- 循環檢測之間相隔的秒數(指定負數或NULL值會調整為)
AS
SET NOCOUNT ON;
IF @interval IS NULL OR @interval < 0
SET @interval = 1;
DECLARE @last_sync_version bigint,
@min_valid_version bigint,
@date_delay datetime;
WHILE 1 = 1
BEGIN
EXEC dbo.p_SyncChangeData_tb_Srouce_Target
@last_sync_version = @last_sync_version OUTPUT,
@min_valid_version = @min_valid_version OUTPUT;
IF @min_valid_version IS NOT NULL
BEGIN
RAISERROR(N'某些未同步的數據已經丟失', 16, 1);
BREAK;
END;
IF @last_sync_version IS NULL
BEGIN
RAISERROR(N'源數據異常,同步版本號小於已經同步的版本號', 16, 1);
BREAK;
END;
SET @date_delay = DATEADD(Second, @interval, GETDATE());
WAITFOR TIME @date_delay;
END
GO
3.同步測試
-- ====================================================
-- 測試
USE DB_test;
GO
-- A.01. 變更源表數據
INSERT dbo.tb_source
(
col1
)
VALUES
(
1
);
INSERT dbo.tb_source
(
col1
)
VALUES
(
2
),
(
3
);
UPDATE dbo.tb_source SET
col2 = 'update'
WHERE pk_id = 2;
-- A.02. 同步源表數據,並查詢同步結果
DECLARE @last_sync_version bigint,
@min_valid_version bigint;
EXEC dbo.p_SyncChangeData_tb_Srouce_Target
@last_sync_version = @last_sync_version OUTPUT,
@min_valid_version = @min_valid_version OUTPUT;
IF @min_valid_version IS NOT NULL
RAISERROR(N'某些未同步的數據已經丟失', 16, 1);
IF @last_sync_version IS NULL
RAISERROR(N'源數據異常,同步版本號小於已經同步的版本號', 16, 1);
SELECT * FROM dbo.tb_source;
SELECT * FROM dbo.tb_Target;
SELECT * FROM dbo.tb_Change_Tracking;
-- B.01. 變更源表數據
INSERT dbo.tb_source
(
col1
)
VALUES
(
11
);
UPDATE dbo.tb_source SET
col2 = 'update.1'
WHERE pk_id = 2;
UPDATE dbo.tb_source SET
col2 = 'update 3'
WHERE pk_id = 3;
DELETE FROM dbo.tb_source
WHERE pk_id < 3;
SET IDENTITY_INSERT dbo.tb_source ON;
INSERT dbo.tb_source
(
pk_id, col1, col2
)
VALUES
(
1, 11, 'insert'
);
SET IDENTITY_INSERT dbo.tb_source OFF;
-- B.02. 同步源表數據,並查詢同步結果
EXEC dbo.p_SyncChangeData_tb_Srouce_Target
@last_sync_version = @last_sync_version OUTPUT,
@min_valid_version = @min_valid_version OUTPUT;
IF @min_valid_version IS NOT NULL
RAISERROR(N'某些未同步的數據已經丟失', 16, 1);
IF @last_sync_version IS NULL
RAISERROR(N'源數據異常,同步版本號小於已經同步的版本號', 16, 1);
SELECT * FROM dbo.tb_source;
SELECT * FROM dbo.tb_Target;
SELECT * FROM dbo.tb_Change_Tracking;
GO
4.刪除示例測試環境
-- ====================================================
-- /* -- 刪除測試環境
USE DB_test;
GO
IF OBJECT_ID(N'dbo.p_SyncChangeData_tb_Srouce_Target') IS NOT NULL
DROP PROC dbo.p_SyncChangeData_tb_Srouce_Target;
IF OBJECT_ID(N'dbo.tb_source') IS NOT NULL
DROP TABLE dbo.tb_source;
IF OBJECT_ID(N'dbo.tb_Target') IS NOT NULL
DROP TABLE dbo.tb_Target;
IF OBJECT_ID(N'dbo.tb_Change_Tracking') IS NOT NULL
DROP TABLE dbo.tb_Change_Tracking;
GO
--*/
/*-- 刪除測試數據庫
USE master;
GO
ALTER DATABASE DB_test SET
SINGLE_USER
WITH
ROLLBACK AFTER 0;
GO
DROP DATABASE DB_test;
--*/
附:
關於使用更改跟蹤實現數據同步的一些補充說明:
1. 使用 SNAPSHOT 事務隔離級別
在進行同步的處理中,需要記錄當前已經處理的最後一個版本號;而在查詢更改跟蹤信息時,無法指定要查詢的更改跟蹤信息的結束版本號。這就要求同步處理過程中,無論是查詢更改的最後一個版本號,還是查詢更改跟蹤信息,都能保證是同一個變更版本的結束版本號。把所有的處理放在事務中,並使用SNAPSHOT事務隔離級別可以保證這點。
2. 自動同步
更改跟蹤的信息只能被查詢,所以在示例中,是在數據變更後,手動運行同步的存儲過程實現數據同步。如果要自動同步,則只能使用定時查詢的方式。本示例提供了一個名為“dbo.p_SyncChangeData_Circle_tb_Srouce_Target”的存儲過程,可以使用Job或者其他程序在SQL Server啟動時,調用這個存儲過程,則這個存儲過程會定時檢查數據變更,並實現數據變更的自動同步。