在系統中,我們經常會遇到這樣的需求:將大量(比如幾十萬、甚至上百萬)的對象進行排序,然後只需要取出最Top的前N名作為排行榜的數據,這即是一個TopN算法。常見的解決方案有三種:
(1)直接使用List的Sort方法進行處理。
(2)使用排序二叉樹進行排序,然後取出前N名。
(3)使用最大堆排序,然後取出前N名。
第一種方案的性能是最差的,後兩種方案性能會好一些,但是還是不能滿足我們的需求。最主要的原因在於使用二叉樹和最大堆排序時,都是對所有的對象進行排序,而不是將代價花費在我們需要的少數的TopN上。為此,我自己實現了TopNOrderedContainer來解決這個問題。
思路是這樣的,使用一個長度為N的數組,來存放最Top的N個對象,越Top的對象其在數組中的Index就越小。這樣,每次加入一個對象時,就與Index最大的那個對象比較,如果比其更Top,則交換兩個對象的位置。如果被交換的對象是數組中的最後一個對象(Index最大),則該對象會被拋棄。如此,可以保證容器中始終保持的都是最Top的N個對象。
接下來我們看具體的實現。
如果一個對象要參與TopN排行榜,則其必須實現IOrdered接口,表明其可以被Top排序。
/// <summary>
/// IOrdered 參與排行榜排序的對象必須實現的接口。
/// </summary>
/// <typeparam name="TOrderedObj">參與排行榜排序的對象的類型</typeparam>
public interface IOrdered<TOrderedObj>
{
bool IsTopThan(TOrderedObj other);
}
之所以使用泛型參數TOrderedObj,是為了避免派生類在實現IsTopThan方法時,需要將參數other進行向下轉換。
接下來是TopNOrderedContainer實現的源碼:
/// <summary>
/// TopNOrderedContainer 用於始終保持排行榜前N名的Object。該實現是線程安全的。
/// zhuweisky 2009.05.23
/// </summary>
/// <typeparam name="TID">被排名的對象的標志類型</typeparam>
/// <typeparam name="TObj">被排名的對象類型</typeparam>
public class TopNOrderedContainer<TObj> where TObj : IOrdered<TObj>
{
private TObj[] orderedArray = null;
private int validObjCount = 0;
private SmartRWLocker smartRWLocker = new SmartRWLocker();
#region TopNumber
private int topNumber = 10;
public int TopNumber
{
get { return topNumber; }
set { topNumber = value; }
}
#endregion
#region Ctor
public TopNOrderedContainer() { }
public TopNOrderedContainer(int _topNumber)
{
this.topNumber = _topNumber;
}
#endregion
#region Initialize
public void Initialize()
{
if (this.topNumber < 1)
{
throw new Exception("The value of TopNumber must greater than 0 ");
}
this.orderedArray = new TObj[this.topNumber];
}
#endregion
#region Add List
public void Add(IList<TObj> list)
{
if (list == null)
{
return;
}
using (this.smartRWLocker.Lock(AccessMode.Write))
{
foreach (TObj obj in list)
{
this.DoAdd(obj);
}
}
}
#endregion
#region Add
public void Add(TObj obj)
{
using (this.smartRWLocker.Lock(AccessMode.Write))
{
this.DoAdd(obj);
}
}
#endregion
#region GetTopN
public TObj[] GetTopN()
{
using (this.smartRWLocker.Lock(AccessMode.Read))
{
return (TObj[])this.orderedArray.Clone();
}
}
#endregion
#region Private
#region DoAdd
private void DoAdd(TObj obj)
{
if (obj == null)
{
return;
}
if (this.validObjCount < this.topNumber)
{
this.orderedArray[this.validObjCount] = obj;
this.Adjust(this.validObjCount);
++this.validObjCount;
return;
}
if (this.orderedArray[this.topNumber - 1].IsTopThan(obj))
{
return;
}
this.orderedArray[this.topNumber - 1] = obj;
this.Adjust(this.topNumber - 1);
}
#endregion
#region Adjust
/// <summary>
/// Adjust 調整posIndex處的對象到合適的位置。
/// 與相鄰前一個對象比較,如果當前對象更加Top,則與前一個對象交換位置。
/// </summary>
private void Adjust(int posIndex)
{
TObj obj = this.orderedArray[posIndex];
for (int index = posIndex; index > 0; index--)
{
if (obj.IsTopThan(this.orderedArray[index - 1]))
{
TObj temp = this.orderedArray[index - 1];
this.orderedArray[index - 1] = obj;
this.orderedArray[index] = temp;
}
else
{
break;
}
}
}
#endregion
#endregion
}
源碼面前毫無秘密。
但是有幾點我還是需要說明一下:
(1)ESBasic.ObjectManagement.TopNOrderedContainer位於我的ESBasic.dll類庫中,其實現時用到的SmartRWLocker是一個讀寫鎖,也是ESBasic.dll類庫中的一員。你可以從這裡下載ESBasic.dll直接試用。
(2)為何不將TopN排序直接實現為一個靜態方法,如:
public static TObj[] GetTopN<TObj>(IList<TObj> list) where TObj : IOrdered<TObj>
如果要是這樣實現,那我們就沒有辦法繼續動態的Add新的TObj對象進來,如果要達到這樣的目的,就只有構造新的list,再次調用static GetTopN方法,如此會重復做一些工作。
最後,我們來測試一下TopNOrderedContainer與List.Sort方法的性能比較,測試的對象數目為500000個,取出Top20。測試代碼如下:
public class UserData : IOrdered<UserData>
{
#region UserID
private string userID;
public string UserID
{
get { return userID; }
set { userID = value; }
}
#endregion
#region Score
private int score;
public int Score
{
get { return score; }
set { score = value; }
}
#endregion
public UserData(string _userID, int _score)
{
this.userID = _userID;
this.score = _score;
}
#region IOrdered<string> 成員
public bool IsTopThan(UserData other)
{
return this.Score > other.Score;
}
public override string ToString()
{
return this.score.ToString();
}
#endregion
}
private void button4_Click(object sender, EventArgs e)
{
List<UserData> list = new List<UserData>();
for (int i = 0; i < 500000; i++)
{
list.Add(new UserData("User" + i.ToString(), i * i * i - 3 * i * i + 4 * i + 8));
}
List<UserData> list2 = new List<UserData>();
for (int i = 0; i < 500000; i++)
{
list2.Add(new UserData("User" + i.ToString(), i * i * i - 3 * i * i + 4 * i + 8));
}
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
list.Sort(this);
stopwatch.Stop();
long ms1 = stopwatch.ElapsedMilliseconds;
stopwatch.Reset();
stopwatch.Start();
TopNOrderedContainer<UserData> container = new TopNOrderedContainer<UserData>(20);
container.Initialize();
container.Add(list2);
UserData[] res = container.GetTopN();
stopwatch.Stop();
long ms2 = stopwatch.ElapsedMilliseconds;
}
#region IComparer<UserData> 成員
public int Compare(UserData x, UserData y)
{
return (y.Score - x.Score);
}
#endregion
測試的結果顯示,使用List.Sort方法需要1287ms,而TopNOrderedContainer只花了78ms