思路:ThreadPool的靜態方法RegisterWaitForSingleObject是專門解決這個問題的,它可以等待一個 等待句柄,在這個等待句柄收到信號後執行一個回調方法,從而解決了利用一個單獨的線程去等待信號的 問題,減少了空閒的線程。為了演示這個場景,我們用HttpListener類來創建一個網絡服務端,收到客戶 端的請求後要異步調用一個方法去處理請求,在這個請求處理完或者超時之前,不會給客戶端返回 Response的,客戶端的請求一直在這兒等著 。ThreadPool.RegisterWaitForSingleObject注冊了異步方 法調用返回的等待句柄,等異步方法執行完後會給這個等待句柄發送信號,從而執行回調,如果在指定的 時間內等待句柄沒有收到信號,我們就給客戶端返回504超時的應答,否則就把處理結果返回給客戶端。
當然ThreadPool.RegisterWaitForSingleObject也可以注冊一個全局的等待句柄,而不是異步方法調 用返回的等待句柄,這樣可以更靈活一些,可以給後端服務發送請求,然後等待全局句柄一段兒時間,等 收到後端服務的相應的Response後再給這個全局等待句柄發送信號,從而把處理結果返回給客戶端。
另外就是由於WaitOrTimerCallback委托不接受按引用傳遞的object參數,所以我們用一個全局的字典 (_dictHandlerResults)去保存每個請求處理的結果信息,在收到請求後獲取一個序列號,同時告訴給 異步的請求處理方法(asyncHandlerRequest)和線程池注冊的等待句柄回調方法 (RequestCompleteCallback)。請求處理方法在處理完請求後以傳入的序列號為key把處理結果放入全局 的請求處理結果字典裡,線程池等待句柄回調方法也根據這個序列號去全局字典裡取請求處理結果。考慮 到排隊的請求可能不會是無窮大的,所以全局字典有上限,暫定為10萬,然後序列號的取值是從1w到10w ,這樣全局字典可以不用加任何鎖就可以用,而且幾乎不會發生通過某個序列號取到的請求處理結果和這 個請求不匹配,除非請求的排隊大於10萬。
服務端的實現代碼如下:
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Runtime.Remoting.Messaging;
using System.Text;
using System.Threading;
namespace Server
{
internal class Program
{
private static void Main(string[] args)
{
try
{
MyHttpServer myHttpServer = new MyHttpServer();
myHttpServer.Start();
Console.ReadKey();
myHttpServer.Stop();
}
catch (Exception ex)
{
Console.WriteLine(ex);
Console.ReadKey();
}
}
}
internal class MyHttpServer
{
private const string PREFIXES = "http://+:8080/service/";
private const int MAX_PENDING_REQUEST = 100000;
private static int _currentSeq = 0;
private static readonly Random _random = new Random();
private static readonly Dictionary<int, string> _dictHandlerResults = new Dictionary<int, string> (MAX_PENDING_REQUEST);
private readonly HttpListener _listener;
public MyHttpServer()
{
if (!HttpListener.IsSupported)
throw new NotSupportedException("系統不支持HttpListener");
_listener = new HttpListener();
for (int i = 0; i < MAX_PENDING_REQUEST; i++)
{
_dictHandlerResults.Add(i,string.Empty);
}
}
public void Start()
{
_listener.Prefixes.Add(PREFIXES);
_listener.Start();
Console.WriteLine("Listening");
_listener.BeginGetContext (ListenerCallback, _listener);
}
public void Stop()
{
_listener.Stop();
}
private static int getSeq()
{
if (++_currentSeq == MAX_PENDING_REQUEST)
_currentSeq = 0;
return _currentSeq;
}
private static void showThreadCount()
{
int workerThreads, ioThreads, maxWorkerThreads, maxIoThreads;
ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxIoThreads);
ThreadPool.GetAvailableThreads(out workerThreads, out ioThreads);
Console.WriteLine("workerThreads: {0},ioThreads:{1}", maxWorkerThreads - workerThreads, maxIoThreads - ioThreads);
}
private static void ListenerCallback(IAsyncResult result)
{
try
{
HttpListener listener = (HttpListener) result.AsyncState;
listener.BeginGetContext(ListenerCallback, listener);
showThreadCount();
HttpListenerContext context = listener.EndGetContext(result);
HttpListenerRequest request = context.Request;
HttpListenerResponse response = context.Response;
int tempKey = getSeq();
Console.WriteLine("current seq: {0}",tempKey);
//這裡模仿異步的IO密集型的操作,如果是計算密集型操作就 不用異步調用了
//因為這裡本身就是線程池線程在處理,可以做一些異步的 Remoting調用,Socket調用等。
//或者給後端服務發送請求後,等應答回來後找到 匹配的WaitHandler調用其set方法。
asyncHandlerRequestDelegate d = asyncHandlerRequest;
IAsyncResult ar = d.BeginInvoke(request, tempKey, asyncHandlerRequestCallback, null);
ThreadPool.
RegisterWaitForSingleObject(
ar.AsyncWaitHandle,
RequestCompleteCallback,
new object[] { response, tempKey },
5000, true);
}
catch (Exception ex) {Console.WriteLine(ex);}
}
private delegate void asyncHandlerRequestDelegate(HttpListenerRequest request, int key);
private static void asyncHandlerRequest(HttpListenerRequest request, int key)
{
int rnd = _random.Next(1, 8);
Thread.Sleep(rnd * 1000); // 這裡模仿復雜的處理 ,真實系統中打死你你也別在線程池線程裡sleep哦
_dictHandlerResults[key] = string.Format("this request handler by {0} miniut.", rnd);
}
private static void asyncHandlerRequestCallback(IAsyncResult ar)
{
asyncHandlerRequestDelegate d = (asyncHandlerRequestDelegate)((AsyncResult) ar).AsyncDelegate;
d.EndInvoke(ar);
}
private static void RequestCompleteCallback(object state, bool isTimeout)
{
object[] states = (object[])state;
HttpListenerResponse response = (HttpListenerResponse)states[0];
int key = (int)states[1];
Stream output = null;
try
{
output = response.OutputStream;
string requestHandlerResult;
if (isTimeout)
requestHandlerResult = "504 time out";
else
_dictHandlerResults.TryGetValue(key, out requestHandlerResult);
byte[] buffer = Encoding.UTF8.GetBytes (requestHandlerResult);
response.ContentLength64 = buffer.Length;
output.Write(buffer, 0, buffer.Length);
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
try { if (output != null) output.Close(); }catch { }
}
}
}
}
客戶端模擬一些HTTP請求,代碼如下
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Net;
using System.Threading;
namespace Client
{
class Program
{
static void Main(string[] args)
{
for (int i = 0; i < 10000; i++)
{
WebRequest Request = WebRequest.Create("http://localhost:8080/service/abc.aspx");
Request.BeginGetResponse(GetResponseCallback, Request);
Thread.Sleep (1000);
}
Console.ReadKey();
}
static void GetResponseCallback(IAsyncResult ar)
{
try
{
WebRequest request = (WebRequest)ar.AsyncState;
WebResponse response = request.EndGetResponse(ar);
StreamReader streamReader = new StreamReader(response.GetResponseStream(),
Encoding.UTF8);
Console.WriteLine(streamReader.ReadToEnd());
}
catch (Exception ex){ Console.WriteLine(ex);}
}
}
}
小結:不知道這樣大量的使用等待句柄會不會有性能問題,還有就是不知道.net的httplistener對象 的實現性能如何,有無bug。