程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> .Net Cancellable Task - APM異步超時機制擴展

.Net Cancellable Task - APM異步超時機制擴展

編輯:關於.NET

概述

.NET基於委托的APM(Asynchronous Programming Model)模型通過BeginInvoke, EndInvoke, AsyncCallback,IAsyncResult的組合使用,讓程序員可以方便的進行異步調用、異步回調和同步等待等操作。但.NET平台還沒有為線程的中止(abort)提供安全可靠的機制,也許正是基於這個原因APM並沒有包含異步調用的超時機制,而是把這個可能引起爭議的工作交給使用者自己來把握。

作為APM模型的補充,本文通過CancellableTask類提供了一個異步調用超時機制。CancellableTask類的設計有兩個主要的考慮:

1.保持APM風格,使用者依然可以使用熟悉的BeginInvoke, EndInvoke, IAsyncResult, AsyncCallback等;

2.提供基於Thread.Abort的默認超時處理,同時支持用戶自定義cancel回調。

使用

CancellableTask的構造函數包含workCallbak和cancelCallback(可選)兩參數,分別對應work回調和cancel回調。CancellableTask的BeginInvoke保持了APM的風格,可以看作是增加了timeout參數(單位:ms)的擴展版;而EndInvoke,AsyncCallback以及IAsyncResult的使用都和APM保持一致。Work委托產生的異常會在EndInvoke時拋出,同時若線程被超時中止,EndInvoke則會拋出ThreadAbortException異常。

下面是一段CancellableTask的使用示例:

class Program
{
    static void Main(string[] args)
    {
        //默認超時直接abort線程
        {
            Console.WriteLine("[case 1]");
            CancellableTask cancellableTask = new CancellableTask(Work);
            State arg = new State { Loop = 20, Stop = false };
            IAsyncResult asyncResult = cancellableTask.BeginInvoke(
                arg, 
                (ar => Console.WriteLine("Async Callback")), 
                null, 
                10 * 1000);
            asyncResult.AsyncWaitHandle.WaitOne();
            try
            {
                object r = cancellableTask.EndInvoke(asyncResult);
                Console.WriteLine("return " + r);
            }
            catch (ThreadAbortException)
            {
                Console.WriteLine("Thread Aborted");
            }
            catch (Exception exp)
            {
                Console.WriteLine(exp.ToString());
            }
        }
        //自定義Cancel回調
        {
            Console.WriteLine(Environment.NewLine + "[case 2]");
            CancellableTask cancellableTask = new CancellableTask(Work, Cancel);
            State arg = new State { Loop = 20, Stop = false };
            IAsyncResult asyncResult = cancellableTask.BeginInvoke(
                arg,
                (ar =>
                    {
                        try
                        {
                            object r = cancellableTask.EndInvoke(ar);
                            Console.WriteLine("return " + r);
                        }
                        catch (ThreadAbortException)
                        {
                            Console.WriteLine("Thread Aborted");
                        }
                        catch (Exception exp)
                        {
                            Console.WriteLine(exp.ToString());
                        }
                    }
                ),
                arg,
                10 * 1000);
        }
        Console.ReadLine();
    }
    static object Work(object arg)
    {
        State state = arg as State;
        int i;
        for (i = 0; i < state.Loop; i++)
        {
            if (state.Stop) break;
            Console.WriteLine(i);
            Thread.Sleep(1000);
        }
        return i;
    }
    static void Cancel(object state)
    {
        State st = state as State;
        st.Stop = true;
    }
}
internal class State
{
    public int Loop { get; set; }
    public bool Stop { get; set; }
}

實現

CancellableTask通過wrapper對workCallback進行包裝。Wrapper內部首先創建等待事件e,並通過ThreadPool.RegisterWaitForSingleObject注冊事件和WaitOrTimeout回調,然後調用workCallback。若workCallback提前返回,調用e.Set(),ThreadPool會調用WaitOrTimeout回調,isTimeout參數為false,不進行處理;否則,當workCallback超時未返回,ThreadPool會調用WaitOrTimeout回調,isTimeout參數為true。WaitOrTimeout回調在isTimeout情況下,首先判斷是否有自定義cancel回調,如果有則采用自定義回調;否則,默認情況下調用Thread.Abort終止work線程。下面是CancellableTask的實現細節:

public class CancellableTask
{
    public delegate object WorkCallback(object arg);
    public delegate void CancelCallback(object state);
    protected class TimeoutState
    {
        internal Thread thread;
        internal object state;
        public TimeoutState(Thread thread, object state)
        {
            this.thread = thread;
            this.state = state;
        }
    }
    protected WorkCallback workCallback;
    protected CancelCallback cancelCallback;
    protected WorkCallback wrapper;
    public CancellableTask(WorkCallback workCallback)
    {
        this.workCallback = workCallback;
    }
    public CancellableTask(WorkCallback workCallback, CancelCallback cancelCallback)
    {
        this.workCallback = workCallback;
        this.cancelCallback = cancelCallback;
    }
    public IAsyncResult BeginInvoke(object arg, AsyncCallback asyncCallback, object state, int timeout)
    {
        wrapper = delegate(object argv)
        {
            AutoResetEvent e = new AutoResetEvent(false);
            try
            {
                TimeoutState waitOrTimeoutState = new TimeoutState(Thread.CurrentThread, state);
                ThreadPool.RegisterWaitForSingleObject(e, WaitOrTimeout, waitOrTimeoutState, timeout, true);
                return workCallback(argv);
            }
            finally
            {
                e.Set();
            }
        };
        IAsyncResult asyncResult = wrapper.BeginInvoke(arg, asyncCallback, state);
        return asyncResult;
    }
    public object EndInvoke(IAsyncResult result)
    {
        return wrapper.EndInvoke(result);
    }
    protected void WaitOrTimeout(object state, bool isTimeout)
    {
        try
        {
            if (isTimeout)
            {
                TimeoutState waitOrTimeoutState = state as TimeoutState;
                if (null != cancelCallback)
                {
                    cancelCallback(waitOrTimeoutState.state);
                }
                else
                {
                    waitOrTimeoutState.thread.Abort();
                }
            }
        }
        catch { }
    }
}

總結

本文為.NET APM模型提供了異步超時機制擴展,一方面保持了APM編程風格,另一方面支持用戶自定義cancel回調。需要注意的是,默認的cancel方式Thread.Abort的安全性問題,使用時應注意資源釋放等。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved