程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 網頁編程 >> PHP編程 >> 關於PHP編程 >> PHP中使用協同程序實現合作多任務

PHP中使用協同程序實現合作多任務

編輯:關於PHP編程

    PHP5.5一個比較好的新功能是實現對生成器和協同程序的支持。對於生成器,PHP的文檔和各種其他的博客文章(就像這一個或這一個)已經有了非常詳細的講解。協同程序相對受到的關注就少了,所以協同程序雖然有很強大的功能但也很難被知曉,解釋起來也比較困難。

    這篇文章指導你通過使用協同程序來實施任務調度,通過實例實現對技術的理解。我將在前三節做一個簡單的背景介紹。如果你已經有了比較好的基礎,可以直接跳到“協同多任務處理”一節。

    生成器

    生成器最基本的思想也是一個函數,這個函數的返回值是依次輸出,而不是只返回一個單獨的值。或者,換句話說,生成器使你更方便的實現了迭代器接口。下面通過實現一個xrange函數來簡單說明:

    復制代碼 代碼如下:
    <?php
    function xrange($start, $end, $step = 1) {
        for ($i = $start; $i <= $end; $i += $step) {
            yield $i;
        }
    }

    foreach (xrange(1, 1000000) as $num) {
        echo $num, "n";
    }

    上面這個xrange()函數提供了和PHP的內建函數range()一樣的功能。但是不同的是range()函數返回的是一個包含屬組值從1到 100萬的數組(注:請查看手冊)。而xrange()函數返回的是依次輸出這些值的一個迭代器,而且並不會真正以數組形式計算。

    這種方法的優點是顯而易見的。它可以讓你在處理大數據集合的時候不用一次性的加載到內存中。甚至你可以處理無限大的數據流。

    當然,也可以不同通過生成器來實現這個功能,而是可以通過繼承Iterator接口實現。通過使用生成器實現起來會更方便,而不用再去實現iterator接口中的5個方法了。

    生成器為可中斷的函數
    要從生成器認識協同程序,理解它們內部是如何工作的非常重要:生成器是可中斷的函數,在它裡面,yield構成了中斷點。 

    緊接著上面的例子,如果你調用xrange(1,1000000)的話,xrange()函數裡代碼沒有真正地運行。相反,PHP只是返回了一個實現了迭代器接口的 生成器類實例: 
     

    復制代碼 代碼如下:
    <?php
    $range = xrange(1, 1000000);
    var_dump($range); // object(Generator)#1
    var_dump($range instanceof Iterator); // bool(true)

    你對某個對象調用迭代器方法一次,其中的代碼運行一次。例如,如果你調用$range->rewind(),那麼xrange()裡的代碼運 行到控制流 第一次出現yield的地方。在這種情況下,這就意味著當$i=$start時yield $i才運行。傳遞給yield語句的值是使用$range->current()獲取的。

     為了繼續執行生成器中的代碼,你必須 調用$range->next()方法。這將再次啟動生成器,直到yield語句出現。因此,連續調用next()和current()方法 你將能從生成器裡獲得所有的值,直到某個點沒有再出現yield語句。對xrange()來說,這種情形出現在$i超過$end時。在這中情況下, 控制流將到達函數的終點,因此將不執行任何代碼。一旦這種情況發生,vaild()方法將返回假,這時迭代結束。

    協程

    協程給上面功能添加的主要東西是回送數據給生成器的能力。這將把生成器到調用者的單向通信轉變為兩者之間的雙向通信。
    通過調用生成器的send()方法而不是其next()方法傳遞數據給協程。下面的logger()協程是這種通信如何運行的例子: 

    復制代碼 代碼如下:
    <?php

    function logger($fileName) {
        $fileHandle = fopen($fileName, 'a');
        while (true) {
            fwrite($fileHandle, yield . "n");
        }
    }

    $logger = logger(__DIR__ . '/log');
    $logger->send('Foo');
    $logger->send('Bar')

    正如你能看到,這兒yield沒有作為一個語句來使用,而是用作一個表達式。即它有一個返回值。yield的返回值是傳遞給send()方法的值。 在這個例子裡,yield將首先返回"Foo",然後返回"Bar"。

    上面的例子裡yield僅作為接收者。混合兩種用法是可能的,即既可接收也可發送。接收和發送通信如何進行的例子如下:

    復制代碼 代碼如下:
    <?php

    function gen() {
        $ret = (yield 'yield1');
        var_dump($ret);
        $ret = (yield 'yield2');
        var_dump($ret);
    }

    $gen = gen();
    var_dump($gen->current());    // string(6) "yield1"
    var_dump($gen->send('ret1')); // string(4) "ret1"   (the first var_dump in gen)
                                  // string(6) "yield2" (the var_dump of the ->send() return value)
    var_dump($gen->send('ret2')); // string(4) "ret2"   (again from within gen)
                                  // NULL               (the return value of ->send())

    馬上理解輸出的精確順序有點困難,因此確定你知道為什按照這種方式輸出。我願意特別指出的有兩點:第一點,yield表達式兩邊使用 圓括號不是偶然。由於技術原因(雖然我已經考慮為賦值增加一個異常,就像Python那樣),圓括號是必須的。第二點,你可能已經注意到 調用current()之前沒有調用rewind()。如果是這麼做的,那麼已經隱含地執行了rewind操作。 

    多任務協作

    如果閱讀了上面的logger()例子,那麼你認為“為了雙向通信我為什麼要使用協程呢? 為什麼我不能只用常見的類呢?”,你這麼問完全正確。上面的例子演示了基本用法,然而上下文中沒有真正的展示出使用協程的優點。這就是列舉許多協程例子的 理由。正如上面介紹裡提到的,協程是非常強大的概念,不過這樣的應用很稀少而且常常十分復雜。給出一些簡單而真實的例子很難。

    在這篇文章裡,我決定去做的是使用協程實現多任務協作。我們盡力解決的問題是你想並發地運行多任務(或者“程序”)。不過處理器在一個時刻只能運行 一個任務(這篇文章的目標是不考慮多核的)。因此處理器需要在不同的任務之間進行切換,而且總是讓每個任務運行 “一小會兒”。 

    多任務協作這個術語中的“協作”說明了如何進行這種切換的:它要求當前正在運行的任務自動把控制傳回給調度器,這樣它就可以運行其他任務了。這與 “搶占”多任務相反,搶占多任務是這樣的:調度器可以中斷運行了一段時間的任務,不管它喜歡還是不喜歡。協作多任務在Windows的早期版本 (windows95)和Mac OS中有使用,不過它們後來都切換到使用搶先多任務了。理由相當明確:如果你依靠程序自動傳回 控制的話,那麼壞行為的軟件將很容易為自身占用整個CPU,不與其他任務共享。 

    這個時候你應當明白協程和任務調度之間的聯系:yield指令提供了任務中斷自身的一種方法,然後把控制傳遞給調度器。因此協程可以運行多個其他任務。更進一步來說,yield可以用來在任務和調度器之間進行通信。

    我們的目的是 對 “任務”用更輕量級的包裝的協程函數:
     

    復制代碼 代碼如下:
    <?php

    class Task {
        protected $taskId;
        protected $coroutine;
        protected $sendValue = null;
        protected $beforeFirstYield = true;

        public function __construct($taskId, Generator $coroutine) {
            $this->taskId = $taskId;
            $this->coroutine = $coroutine;
        }

        public function getTaskId() {
            return $this->taskId;
        }

        public function setSendValue($sendValue) {
            $this->sendValue = $sendValue;
        }

        public function run() {
            if ($this->beforeFirstYield) {
                $this->beforeFirstYield = false;
                return $this->coroutine->current();
            } else {
                $retval = $this->coroutine->send($this->sendValue);
                $this->sendValue = null;
                return $retval;
            }
        }

        public function isFinished() {
            return !$this->coroutine->valid();
        }
    }

    一個任務是用 任務ID標記一個協程。使用setSendValue()方法,你可以指定哪些值將被發送到下次的恢復(在之後你會了解到我們需要這個)。 run()函數確實沒有做什麼,除了調用send()方法的協同程序。要理解為什麼添加beforeFirstYieldflag,需要考慮下面的代碼片 段:

    復制代碼 代碼如下:
    <?php

    function gen() {
        yield 'foo';
        yield 'bar';
    }

    $gen = gen();
    var_dump($gen->send('something'));

    // As the send() happens before the first yield there is an implicit rewind() call,
    // so what really happens is this:
    $gen->rewind();
    var_dump($gen->send('something'));

    // The rewind() will advance to the first yield (and ignore its value), the send() will
    // advance to the second yield (and dump its value). Thus we loose the first yielded value!

    通過添加 beforeFirstYieldcondition 我們可以確定 first yield 的值 被返回。 

    調度器現在不得不比多任務循環要做稍微多點了,然後才運行多任務:
     

    復制代碼 代碼如下:
    <?php

    class Scheduler {
        protected $maxTaskId = 0;
        protected $taskMap = []; // taskId => task
        protected $taskQueue;

        public function __construct() {
            $this->taskQueue = new SplQueue();
        }

        public function newTask(Generator $coroutine) {
            $tid = ++$this->maxTaskId;
            $task = new Task($tid, $coroutine);
            $this->taskMap[$tid] = $task;
            $this->schedule($task);
            return $tid;
        }

        public function schedule(Task $task) {
            $this->taskQueue->enqueue($task);
        }

        public function run() {
            while (!$this->taskQueue->isEmpty()) {
                $task = $this->taskQueue->dequeue();
                $task->run();

                if ($task->isFinished()) {
                    unset($this->taskMap[$task->getTaskId()]);
                } else {
                    $this->schedule($task);
                }
            }
        }
    }

     newTask()方法(使用下一個空閒的任務id)創建一個新任務,然後把這個任務放入任務映射數組裡。接著它通過把任務放入任務隊列裡來實現 對任務的調度。接著run()方法掃描任務隊列,運行任務。如果一個任務結束了,那麼它將從隊列裡刪除,否則它將在隊列的末尾再次被調度。
     讓我們看看下面具有兩個簡單(並且沒有什麼意義)任務的調度器: 

    復制代碼 代碼如下:
    <?php

    function task1() {
        for ($i = 1; $i <= 10; ++$i) {
            echo "This is task 1 iteration $i.n";
            yield;
        }
    }

    function task2() {
        for ($i = 1; $i <= 5; ++$i) {
            echo "This is task 2 iteration $i.n";
            yield;
        }
    }

    $scheduler = new Scheduler;

    $scheduler->newTask(task1());
    $scheduler->newTask(task2());

    $scheduler->run();

     兩個任務都僅僅回顯一條信息,然後使用yield把控制回傳給調度器。輸出結果如下:

     

    復制代碼 代碼如下:
     This is task 1 iteration 1.
    This is task 2 iteration 1.
    This is task 1 iteration 2.
    This is task 2 iteration 2.
    This is task 1 iteration 3.
    This is task 2 iteration 3.
    This is task 1 iteration 4.
    This is task 2 iteration 4.
    This is task 1 iteration 5.
    This is task 2 iteration 5.
    This is task 1 iteration 6.
    This is task 1 iteration 7.
    This is task 1 iteration 8.
    This is task 1 iteration 9.
    This is task 1 iteration 10.
     

    輸出確實如我們所期望的:對前五個迭代來說,兩個任務是交替運行的,接著第二個任務結束後,只有第一個任務繼續運行。  

    與調度器之間通信

    既然調度器已經運行了,那麼我們就轉向日程表的下一項:任務和調度器之間的通信。我們將使用進程用來和操作系統會話的同樣的方式來通信:系統調用。 我們需要系統調用的理由是操作系統與進程相比它處在不同的權限級別上。因此為了執行特權級別的操作(如殺死另一個進程),就不得不以某種方式把控制傳回給 內核,這樣內核就可以執行所說的操作了。再說一遍,這種行為在內部是通過使用中斷指令來實現的。過去使用的是通用的int指令,如今使用的是更特殊並且更 快速的syscall/sysenter指令。

    我們的任務調度系統將反映這種設計:不是簡單地把調度器傳遞給任務(這樣久允許它做它想做的任何事),我們將通過給yield表達式傳遞信息來與系統調用通信。這兒yield即是中斷,也是傳遞信息給調度器(和從調度器傳遞出信息)的方法。 

    為了說明系統調用,我將對可調用的系統調用做一個小小的封裝:
     

    復制代碼 代碼如下:
    <?php

    class SystemCall {
        protected $callback;

        public function __construct(callable $callback) {
            $this->callback = $callback;
        }

        public function __invoke(Task $task, Scheduler $scheduler) {
            $callback = $this->callback; // Can't call it directly in PHP :/
            return $callback($task, $scheduler);
        }
    }

    它將像其他任何可調用那樣(使用_invoke)運行,不過它要求調度器把正在調用的任務和自身傳遞給這個函數。為了解決這個問題 我們不得不微微的修改調度器的run方法:

    復制代碼 代碼如下:
    <?php
    public function run() {
        while (!$this->taskQueue->isEmpty()) {
            $task = $this->taskQueue->dequeue();
            $retval = $task->run();

            if ($retval instanceof SystemCall) {
                $retval($task, $this);
                continue;
            }

            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }

    第一個系統調用除了返回任務ID外什麼都沒有做:

    復制代碼 代碼如下:
    <?php
    function getTaskId() {
        return new SystemCall(function(Task $task, Scheduler $scheduler) {
            $task->setSendValue($task->getTaskId());
            $scheduler->schedule($task);
        });
    }

    這個函數確實設置任務id為下一次發送的值,並再次調度了這個任務。由於使用了系統調用,所以調度器不能自動調用任務,我們需要手工調度任務(稍後你將明白為什麼這麼做)。要使用這個新的系統調用的話,我們要重新編寫以前的例子:

    復制代碼 代碼如下:
    <?php

    function task($max) {
        $tid = (yield getTaskId()); // <-- here's the syscall!
        for ($i = 1; $i <= $max; ++$i) {
            echo "This is task $tid iteration $i.n";
            yield;
        }
    }

    $scheduler = new Scheduler;

    $scheduler->newTask(task(10));
    $scheduler->newTask(task(5));

    $scheduler->run();

    這段代碼將給出與前一個例子相同的輸出。注意系統調用同其他任何調用一樣正常地運行,不過預先增加了yield。要創建新的任務,然後再殺死它們的話,需要兩個以上的系統調用:  

    復制代碼 代碼如下:
    <?php

    function newTask(Generator $coroutine) {
        return new SystemCall(
            function(Task $task, Scheduler $scheduler) use ($coroutine) {
                $task->setSendValue($scheduler->newTask($coroutine));
                $scheduler->schedule($task);
            }
        );
    }

    function killTask($tid) {
        return new SystemCall(
            function(Task $task, Scheduler $scheduler) use ($tid) {
                $task->setSendValue($scheduler->killTask($tid));
                $scheduler->schedule($task);
            }
        );
    }

    killTask函數需要在調度器裡增加一個方法:

    復制代碼 代碼如下:
    <?php

    public function killTask($tid) {
        if (!isset($this->taskMap[$tid])) {
            return false;
        }

        unset($this->taskMap[$tid]);

        // This is a bit ugly and could be optimized so it does not have to walk the queue,
        // but assuming that killing tasks is rather rare I won't bother with it now
        foreach ($this->taskQueue as $i => $task) {
            if ($task->getTaskId() === $tid) {
                unset($this->taskQueue[$i]);
                break;
            }
        }

        return true;
    }

    用來測試新功能的微腳本:  

    復制代碼 代碼如下:
    <?php

    function childTask() {
        $tid = (yield getTaskId());
        while (true) {
            echo "Child task $tid still alive!n";
            yield;
        }
    }

    function task() {
        $tid = (yield getTaskId());
        $childTid = (yield newTask(childTask()));

        for ($i = 1; $i <= 6; ++$i) {
            echo "Parent task $tid iteration $i.n";
            yield;

            if ($i == 3) yield killTask($childTid);
        }
    }

    $scheduler = new Scheduler;
    $scheduler->newTask(task());
    $scheduler->run();

    這段代碼將打印以下信息:

    復制代碼 代碼如下:
    Parent task 1 iteration 1.
    Child task 2 still alive!
    Parent task 1 iteration 2.
    Child task 2 still alive!
    Parent task 1 iteration 3.
    Child task 2 still alive!
    Parent task 1 iteration 4.
    Parent task 1 iteration 5.
    Parent task 1 iteration 6.

    經過三次迭代以後子任務將被殺死,因此這就是"Child is still alive"消息結束的時候。可能應當指出的是這不是真正的父子關系。 因為甚至在父任務結束後子任務仍然可以運行。或者子任務可以殺死父任務。可以修改調度器使它具有更層級化的任務結構,不過 在這篇文章裡我沒有這麼做。 

    你可以實現許多進程管理調用。例如 wait(它一直等待到任務結束運行時),exec(它替代當前任務)和fork(它創建一個 當前任務的克隆)。fork非常酷,而且你可以使用PHP的協程真正地實現它,因為它們都支持克隆。 

    然而讓我們把這些留給有興趣的讀者吧,我們去看下一個議題。

    幾點人
    翻譯於 4天前
    0人頂
    頂 翻譯的不錯哦!
     

    非阻塞IO
    很明顯,我們的任務管理系統的真正很酷的應用是web服務器。它有一個任務是在套接字上偵聽是否有新連接,當有新連接要建立的時候  ,它創建一個新任務來處理新連接。
     web服務器最難的部分通常是像讀數據這樣的套接字操作是阻塞的。例如PHP將等待到客戶端完成發送為止。對一個WEB服務器來說,這 根本不行;這就意味著服務器在一個時間點上只能處理一個連接。

    解決方案是確保在真正對套接字讀寫之前該套接字已經“准備就緒”。為了查找哪個套接字已經准備好讀或者寫了,可以使用 流選擇函數。 

    首先,讓我們添加兩個新的 syscall,它們將等待直到指定 socket 准備好:

    復制代碼 代碼如下:
     <?php     
     function waitForRead($socket) { 
         return new SystemCall( 
             function(Task $task, Scheduler $scheduler) use ($socket) { 
                 $scheduler->waitForRead($socket, $task); 
             } 
         ); 
     } 

     function waitForWrite($socket) { 
         return new SystemCall( 
             function(Task $task, Scheduler $scheduler) use ($socket) { 
                 $scheduler->waitForWrite($socket, $task); 
             } 
         ); 
     }
    這些 syscall 只是在調度器中代理其各自的方法:  復制代碼 代碼如下:
     <?php 

     // resourceID => [socket, tasks] 
     protected $waitingForRead = []; 
     protected $waitingForWrite = []; 

     public function waitForRead($socket, Task $task) { 
         if (isset($this->waitingForRead[(int) $socket])) { 
             $this->waitingForRead[(int) $socket][1][] = $task; 
         } else { 
             $this->waitingForRead[(int) $socket] = [$socket, [$task]]; 
         } 
     } 

     public function waitForWrite($socket, Task $task) { 
         if (isset($this->waitingForWrite[(int) $socket])) { 
             $this->waitingForWrite[(int) $socket][1][] = $task; 
         } else { 
             $this->waitingForWrite[(int) $socket] = [$socket, [$task]]; 
         } 
     }
     
    waitingForRead 及 waitingForWrite 屬性是兩個承載等待的socket 及等待它們的任務的數組。有趣的部分在於下面的方法,它將檢查 socket 是否可用,並重新安排各自任務: 復制代碼 代碼如下:
     <?php 

     protected function ioPoll($timeout) { 
         $rSocks = []; 
         foreach ($this->waitingForRead as list($socket)) { 
             $rSocks[] = $socket; 
         } 

         $wSocks = []; 
         foreach ($this->waitingForWrite as list($socket)) { 
             $wSocks[] = $socket; 
         } 

         $eSocks = []; // dummy 

         if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) { 
             return; 
         } 

         foreach ($rSocks as $socket) { 
             list(, $tasks) = $this->waitingForRead[(int) $socket]; 
             unset($this->waitingForRead[(int) $socket]); 

             foreach ($tasks as $task) { 
                 $this->schedule($task); 
             } 
         } 

         foreach ($wSocks as $socket) { 
             list(, $tasks) = $this->waitingForWrite[(int) $socket]; 
             unset($this->waitingForWrite[(int) $socket]); 

             foreach ($tasks as $task) { 
                 $this->schedule($task); 
             } 
         } 
     }
     
    stream_select 函數接受承載讀取、寫入以及待檢查的socket的數組(我們無需考慮最後一類)。數組將按引用傳遞,函數只會保留那些狀態改變了的數組元素。我們可以遍歷這些數組,並重新安排與之相關的任務。

    為了正常地執行上面的輪詢動作,我們將在調度器裡增加一個特殊的任務: 復制代碼 代碼如下:
     <?php 
     protected function ioPollTask() { 
         while (true) { 
             if ($this->taskQueue->isEmpty()) { 
                 $this->ioPoll(null); 
             } else { 
                 $this->ioPoll(0); 
             } 
             yield; 
         } 
     }
     

    需要在某個地方注冊這個任務,例如,你可以在run()方法的開始增 加$this->newTask($this->ioPollTask())。然後就像其他 任務一樣每執行完整任務循環一次就執行輪詢操作一次(這麼做一定不是最好的方法)。ioPollTask將使用0秒的超時來調用ioPoll, 這意味著stream_select將立即返回(而不是等待)。 
    只有任務隊列為空時,我們才使用null超時,這意味著它一直等到某個套接口准備就緒。如果我們沒有這麼做,那麼輪詢任務將一而再, 再而三的循環運行,直到有新的連接建立。這將導致100%的CPU利用率。相反,讓操作系統做這種等待會更有效。

    現在編寫服務器相對容易了:

    復制代碼 代碼如下:
     <?php 

     function server($port) { 
         echo "Starting server at port $port...n"; 

         $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); 
         if (!$socket) throw new Exception($errStr, $errNo); 

         stream_set_blocking($socket, 0); 

         while (true) { 
             yield waitForRead($socket); 
             $clientSocket = stream_socket_accept($socket, 0); 
             yield newTask(handleClient($clientSocket)); 
         } 
     } 

     function handleClient($socket) { 
         yield waitForRead($socket); 
         $data = fread($socket, 8192); 

         $msg = "Received following request:nn$data"; 
         $msgLength = strlen($msg); 

         $response = <<<RES 
     HTTP/1.1 200 OKr 
     Content-Type: text/plainr 
     Content-Length: $msgLengthr 
     Connection: closer 
     r 
     $msg
     RES; 

         yield waitForWrite($socket); 
         fwrite($socket, $response); 

         fclose($socket); 
     } 

     $scheduler = new Scheduler; 
     $scheduler->newTask(server(8000)); 
     $scheduler->run();
     
    這段代碼將接收到localhost:8000上的連接,然後僅僅返回發送來的內容作為HTTP響應。要做“實際”的事情的話就愛哪個非常復雜(處理 HTTP請求可能已經超出了這篇文章的范圍)。上面的代碼片段只是演示了一般性的概念。

    你 可以使用類似於ab -n 10000 -c 100 localhost:8000/這樣命令來測試服務器。這條命令將向服務器發送10000個請求,並且其中100個請求將同時到達。使用這樣的數目,我得 到了處於中間的10毫秒的響應時間。不過還有一個問題:有少數幾個請求真正處理的很慢(如5秒), 這就是為什麼總吞吐量只有2000請求/秒(如果是10毫秒的響應時間的話,總的吞吐量應該更像是10000請求/秒)。調高並發數(比如 -c 500),服務器大多數運行良好,不過某些連接將拋出“連接被對方重置”的錯誤。由於我對低級別的socket資料了解的非常少,所以 我不能指出問題出在哪兒。

    協程堆棧
    如果你試圖用我們的調度系統建立更大的系統的話,你將很快遇到問題:我們習慣了把代碼分解為更小的函數,然後調用它們。然而, 如果使用了協程的話,就不能這麼做了。例如,看下面代碼: 復制代碼 代碼如下:
     <?php 

     function echoTimes($msg, $max) { 
         for ($i = 1; $i <= $max; ++$i) { 
             echo "$msg iteration $in"; 
             yield; 
         } 
     } 

     function task() { 
         echoTimes('foo', 10); // print foo ten times 
         echo "---n"; 
         echoTimes('bar', 5); // print bar five times 
         yield; // force it to be a coroutine 
     } 

     $scheduler = new Scheduler; 
     $scheduler->newTask(task()); 
     $scheduler->run();
     

     這段代碼試圖把重復循環“輸出n次“的代碼嵌入到一個獨立的協程裡,然後從主任務裡調用它。然而它無法運行。正如在這篇文章的開始  所提到的,調用生成器(或者協程)將沒有真正地做任何事情,它僅僅返回一個對象。這也出現在上面的例子裡。echoTimes調用除了放回一個(無用的) 協程對象外不做任何事情。 

    為了仍然允許這麼做,我們需要在這個裸協程上寫一個小小的封裝。我們將調用它:“協程堆棧”。因為它將管理嵌套的協程調用堆棧。 這將是通過生成協程來調用子協程成為可能:

     $retval = (yield someCoroutine($foo, $bar));

     使用yield,子協程也能再次返回值:

     yield retval("I'm a return value!");

      retval函數除了返回一個值的封裝外沒有做任何其他事情。這個封裝將表示它是一個返回值。

    復制代碼 代碼如下:
     <?php 

     class CoroutineReturnValue { 
         protected $value; 

         public function __construct($value) { 
             $this->value = $value; 
         } 

         public function getValue() { 
             return $this->value; 
         } 
     } 

     function retval($value) { 
         return new CoroutineReturnValue($value); 
     }

     為了把協程轉變為協程堆棧(它支持子調用),我們將不得不編寫另外一個函數(很明顯,它是另一個協程):

    復制代碼 代碼如下:
     <?php 

     function stackedCoroutine(Generator $gen) { 
         $stack = new SplStack; 

         for (;;) { 
             $value = $gen->current(); 

             if ($value instanceof Generator) { 
                 $stack->push($gen); 
                 $gen = $value; 
                 continue; 
             } 

             $isReturnValue = $value instanceof CoroutineReturnValue; 
             if (!$gen->valid() || $isReturnValue) { 
                 if ($stack->isEmpty()) { 
                     return; 
                 } 

                 $gen = $stack->pop(); 
                 $gen->send($isReturnValue ? $value->getValue() : NULL); 
                 continue; 
             } 

             $gen->send(yield $gen->key() => $value); 
         } 
     }
     
     這 個函數在調用者和當前正在運行的子協程之間扮演著簡單代理的角色。在$gen->send(yield $gen->key()=>$value);這行完成了代理功能。另外它檢查返回值是否是生成器,萬一是生成器的話,它將開始運行這個生成 器,並把前一個協程壓入堆棧裡。一旦它獲得了CoroutineReturnValue的話,它將再次請求堆棧彈出,然後繼續執行前一個協程。 

    為了使協程堆棧在任務裡可用,任務構造器裡的$this-coroutine =$coroutine;這行需要替代為$this->coroutine = StackedCoroutine($coroutine);。 
    現在我們可以稍微改進上面web服務器例子:把wait+read(和wait+write和warit+accept)這樣的動作分組為函數。為了分組相關的 功能,我將使用下面類: 復制代碼 代碼如下:
     <?php 

     class CoSocket { 
         protected $socket; 

         public function __construct($socket) { 
             $this->socket = $socket; 
         } 

         public function accept() { 
             yield waitForRead($this->socket); 
             yield retval(new CoSocket(stream_socket_accept($this->socket, 0))); 
         } 

         public function read($size) { 
             yield waitForRead($this->socket); 
             yield retval(fread($this->socket, $size)); 
         } 

         public function write($string) { 
             yield waitForWrite($this->socket); 
             fwrite($this->socket, $string); 
         } 

         public function close() { 
             @fclose($this->socket); 
         } 
     }
     
     現在服務器可以編寫的稍微簡潔點了: 復制代碼 代碼如下:
     <?php 

     function server($port) { 
         echo "Starting server at port $port...n"; 

         $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); 
         if (!$socket) throw new Exception($errStr, $errNo); 

         stream_set_blocking($socket, 0); 

         $socket = new CoSocket($socket); 
         while (true) { 
             yield newTask( 
                 handleClient(yield $socket->accept()) 
             ); 
         } 
     } 

     function handleClient($socket) { 
         $data = (yield $socket->read(8192)); 

         $msg = "Received following request:nn$data"; 
         $msgLength = strlen($msg); 

         $response = <<<RES 
     HTTP/1.1 200 OKr 
     Content-Type: text/plainr 
     Content-Length: $msgLengthr 
     Connection: closer 
     r 
     $msg
     RES; 

         yield $socket->write($response); 
         yield $socket->close(); 
     }

    錯誤處理
    作為一個優秀的程序員,相信你已經察覺到上面的例子缺少錯誤處理。幾乎所有的 socket 都是易出錯的。我這樣做的原因一方面固然是因為錯誤處理的乏味(特別是 socket!),另一方面也在於它很容易使代碼體積膨脹。
    不過,我仍然了一講一下常見的協程錯誤處理:協程允許使用 throw() 方法在其內部拋出一個錯誤。盡管此方法還未在 PHP 中實現,但我很快就會提交它,就在今天。
    throw() 方法接受一個 Exception,並將其拋出到協程的當前懸掛點,看看下面代碼: 復制代碼 代碼如下:
     <?php 

     function gen() { 
         echo "Foon"; 
         try { 
             yield; 
         } catch (Exception $e) { 
             echo "Exception: {$e->getMessage()}n"; 
         } 
         echo "Barn"; 
     } 

     $gen = gen(); 
     $gen->rewind();                     // echos "Foo" 
     $gen->throw(new Exception('Test')); // echos "Exception: Test" 
                                         // and "Bar"
    這非常棒,因為我們可以使用系統調用以及子協程調用異常拋出。對與系統調用,Scheduler::run() 方法需要一些小調整: 復制代碼 代碼如下:
     <?php 

     if ($retval instanceof SystemCall) { 
         try { 
             $retval($task, $this); 
         } catch (Exception $e) { 
             $task->setException($e); 
             $this->schedule($task); 
         } 
         continue; 
     }
     
    Task 類也許要添加 throw 調用處理: 復制代碼 代碼如下:
     <?php 

     class Task { 
         // ... 
         protected $exception = null; 

         public function setException($exception) { 
             $this->exception = $exception; 
         } 

         public function run() { 
             if ($this->beforeFirstYield) { 
                 $this->beforeFirstYield = false; 
                 return $this->coroutine->current(); 
             } elseif ($this->exception) { 
                 $retval = $this->coroutine->throw($this->exception); 
                 $this->exception = null; 
                 return $retval; 
             } else { 
                 $retval = $this->coroutine->send($this->sendValue); 
                 $this->sendValue = null; 
                 return $retval; 
             } 
         } 

         // ... 
     }
     
    現在,我們已經可以在系統調用中使用異常拋出了!例如,要調用 killTask,讓我們在傳遞 ID 不可用時拋出一個異常: 復制代碼 代碼如下:
     <?php 

     function killTask($tid) { 
         return new SystemCall( 
             function(Task $task, Scheduler $scheduler) use ($tid) { 
                 if ($scheduler->killTask($tid)) { 
                     $scheduler->schedule($task); 
                 } else { 
                     throw new InvalidArgumentException('Invalid task ID!'); 
                 } 
             } 
         ); 
     }
    試試看: 復制代碼 代碼如下:
     <?php     
     function task() { 
         try { 
             yield killTask(500); 
         } catch (Exception $e) { 
             echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "n"; 
         } 
     }
     
    這些代碼現在尚不能正常運作,因為 stackedCoroutine 函數無法正確處理異常。要修復需要做些調整: 復制代碼 代碼如下:
     <?php     
     function stackedCoroutine(Generator $gen) { 
         $stack = new SplStack; 
         $exception = null; 

         for (;;) { 
             try { 
                 if ($exception) { 
                     $gen->throw($exception); 
                     $exception = null; 
                     continue; 
                 } 

                 $value = $gen->current(); 

                 if ($value instanceof Generator) { 
                     $stack->push($gen); 
                     $gen = $value; 
                     continue; 
                 } 

                 $isReturnValue = $value instanceof CoroutineReturnValue; 
                 if (!$gen->valid() || $isReturnValue) { 
                     if ($stack->isEmpty()) { 
                         return; 
                     } 

                     $gen = $stack->pop(); 
                     $gen->send($isReturnValue ? $value->getValue() : NULL); 
                     continue; 
                 } 

                 try { 
                     $sendValue = (yield $gen->key() => $value); 
                 } catch (Exception $e) { 
                     $gen->throw($e); 
                     continue; 
                 } 

                 $gen->send($sendValue); 
             } catch (Exception $e) { 
                 if ($stack->isEmpty()) { 
                     throw $e; 
                 } 

                 $gen = $stack->pop(); 
                 $exception = $e; 
             } 
         } 
     }
     

    結束語

    在 這篇文章裡,我使用多任務協作構建了一個任務調度器,其中包括執行“系統調用”,做非阻塞操作和處理錯誤。所有這些裡真正很酷的事情是任務的結果代碼看起 來完全同步,甚至任務正在執行大量的異步操作的時候也是這樣。如果你打算從套接口讀取數據的話,你將不需要傳遞某個回調函數或者注冊一個事件偵聽器。相 反,你只要書寫yield $socket->read()。這兒大部分都是你常常也要編寫的,只在它的前面增加yield。
    當我第一次 聽到所有這一切的時候,我發現這個概念完全令人折服,而且正是這個激勵我在PHP中實現了它。同時我發現協程真正令人心慌。在令人敬畏的代碼和很大一堆代 碼之間只有單薄的一行,我認為協程正好處在這一行上。講講使用上面所述的方法書寫異步代碼是否真的有益對我來說很難。
    無論如何,我認為這是一個有趣的話題,而且我希望你也能找到它的樂趣。歡迎評論:)
    1. 上一頁:
    2. 下一頁:
    Copyright © 程式師世界 All Rights Reserved