PHP5.5一個比較好的新功能是實現對生成器和協同程序的支持。對於生成器,PHP的文檔和各種其他的博客文章(就像這一個或這一個)已經有了非常詳細的講解。協同程序相對受到的關注就少了,所以協同程序雖然有很強大的功能但也很難被知曉,解釋起來也比較困難。
這篇文章指導你通過使用協同程序來實施任務調度,通過實例實現對技術的理解。我將在前三節做一個簡單的背景介紹。如果你已經有了比較好的基礎,可以直接跳到“協同多任務處理”一節。
生成器
生成器最基本的思想也是一個函數,這個函數的返回值是依次輸出,而不是只返回一個單獨的值。或者,換句話說,生成器使你更方便的實現了迭代器接口。下面通過實現一個xrange函數來簡單說明:
復制代碼 代碼如下:foreach (xrange(1, 1000000) as $num) {
echo $num, "n";
}
上面這個xrange()函數提供了和PHP的內建函數range()一樣的功能。但是不同的是range()函數返回的是一個包含屬組值從1到 100萬的數組(注:請查看手冊)。而xrange()函數返回的是依次輸出這些值的一個迭代器,而且並不會真正以數組形式計算。
這種方法的優點是顯而易見的。它可以讓你在處理大數據集合的時候不用一次性的加載到內存中。甚至你可以處理無限大的數據流。
當然,也可以不同通過生成器來實現這個功能,而是可以通過繼承Iterator接口實現。通過使用生成器實現起來會更方便,而不用再去實現iterator接口中的5個方法了。
生成器為可中斷的函數
要從生成器認識協同程序,理解它們內部是如何工作的非常重要:生成器是可中斷的函數,在它裡面,yield構成了中斷點。
緊接著上面的例子,如果你調用xrange(1,1000000)的話,xrange()函數裡代碼沒有真正地運行。相反,PHP只是返回了一個實現了迭代器接口的 生成器類實例:
你對某個對象調用迭代器方法一次,其中的代碼運行一次。例如,如果你調用$range->rewind(),那麼xrange()裡的代碼運 行到控制流 第一次出現yield的地方。在這種情況下,這就意味著當$i=$start時yield $i才運行。傳遞給yield語句的值是使用$range->current()獲取的。
為了繼續執行生成器中的代碼,你必須 調用$range->next()方法。這將再次啟動生成器,直到yield語句出現。因此,連續調用next()和current()方法 你將能從生成器裡獲得所有的值,直到某個點沒有再出現yield語句。對xrange()來說,這種情形出現在$i超過$end時。在這中情況下, 控制流將到達函數的終點,因此將不執行任何代碼。一旦這種情況發生,vaild()方法將返回假,這時迭代結束。
協程
協程給上面功能添加的主要東西是回送數據給生成器的能力。這將把生成器到調用者的單向通信轉變為兩者之間的雙向通信。
通過調用生成器的send()方法而不是其next()方法傳遞數據給協程。下面的logger()協程是這種通信如何運行的例子:
function logger($fileName) {
$fileHandle = fopen($fileName, 'a');
while (true) {
fwrite($fileHandle, yield . "n");
}
}
$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar')
正如你能看到,這兒yield沒有作為一個語句來使用,而是用作一個表達式。即它有一個返回值。yield的返回值是傳遞給send()方法的值。 在這個例子裡,yield將首先返回"Foo",然後返回"Bar"。
上面的例子裡yield僅作為接收者。混合兩種用法是可能的,即既可接收也可發送。接收和發送通信如何進行的例子如下:
復制代碼 代碼如下:function gen() {
$ret = (yield 'yield1');
var_dump($ret);
$ret = (yield 'yield2');
var_dump($ret);
}
$gen = gen();
var_dump($gen->current()); // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen)
// string(6) "yield2" (the var_dump of the ->send() return value)
var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen)
// NULL (the return value of ->send())
馬上理解輸出的精確順序有點困難,因此確定你知道為什按照這種方式輸出。我願意特別指出的有兩點:第一點,yield表達式兩邊使用 圓括號不是偶然。由於技術原因(雖然我已經考慮為賦值增加一個異常,就像Python那樣),圓括號是必須的。第二點,你可能已經注意到 調用current()之前沒有調用rewind()。如果是這麼做的,那麼已經隱含地執行了rewind操作。
多任務協作
如果閱讀了上面的logger()例子,那麼你認為“為了雙向通信我為什麼要使用協程呢? 為什麼我不能只用常見的類呢?”,你這麼問完全正確。上面的例子演示了基本用法,然而上下文中沒有真正的展示出使用協程的優點。這就是列舉許多協程例子的 理由。正如上面介紹裡提到的,協程是非常強大的概念,不過這樣的應用很稀少而且常常十分復雜。給出一些簡單而真實的例子很難。
在這篇文章裡,我決定去做的是使用協程實現多任務協作。我們盡力解決的問題是你想並發地運行多任務(或者“程序”)。不過處理器在一個時刻只能運行 一個任務(這篇文章的目標是不考慮多核的)。因此處理器需要在不同的任務之間進行切換,而且總是讓每個任務運行 “一小會兒”。
多任務協作這個術語中的“協作”說明了如何進行這種切換的:它要求當前正在運行的任務自動把控制傳回給調度器,這樣它就可以運行其他任務了。這與 “搶占”多任務相反,搶占多任務是這樣的:調度器可以中斷運行了一段時間的任務,不管它喜歡還是不喜歡。協作多任務在Windows的早期版本 (windows95)和Mac OS中有使用,不過它們後來都切換到使用搶先多任務了。理由相當明確:如果你依靠程序自動傳回 控制的話,那麼壞行為的軟件將很容易為自身占用整個CPU,不與其他任務共享。
這個時候你應當明白協程和任務調度之間的聯系:yield指令提供了任務中斷自身的一種方法,然後把控制傳遞給調度器。因此協程可以運行多個其他任務。更進一步來說,yield可以用來在任務和調度器之間進行通信。
我們的目的是 對 “任務”用更輕量級的包裝的協程函數:
class Task {
protected $taskId;
protected $coroutine;
protected $sendValue = null;
protected $beforeFirstYield = true;
public function __construct($taskId, Generator $coroutine) {
$this->taskId = $taskId;
$this->coroutine = $coroutine;
}
public function getTaskId() {
return $this->taskId;
}
public function setSendValue($sendValue) {
$this->sendValue = $sendValue;
}
public function run() {
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}
public function isFinished() {
return !$this->coroutine->valid();
}
}
一個任務是用 任務ID標記一個協程。使用setSendValue()方法,你可以指定哪些值將被發送到下次的恢復(在之後你會了解到我們需要這個)。 run()函數確實沒有做什麼,除了調用send()方法的協同程序。要理解為什麼添加beforeFirstYieldflag,需要考慮下面的代碼片 段:
復制代碼 代碼如下:function gen() {
yield 'foo';
yield 'bar';
}
$gen = gen();
var_dump($gen->send('something'));
// As the send() happens before the first yield there is an implicit rewind() call,
// so what really happens is this:
$gen->rewind();
var_dump($gen->send('something'));
// The rewind() will advance to the first yield (and ignore its value), the send() will
// advance to the second yield (and dump its value). Thus we loose the first yielded value!
通過添加 beforeFirstYieldcondition 我們可以確定 first yield 的值 被返回。
調度器現在不得不比多任務循環要做稍微多點了,然後才運行多任務:
class Scheduler {
protected $maxTaskId = 0;
protected $taskMap = []; // taskId => task
protected $taskQueue;
public function __construct() {
$this->taskQueue = new SplQueue();
}
public function newTask(Generator $coroutine) {
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->taskMap[$tid] = $task;
$this->schedule($task);
return $tid;
}
public function schedule(Task $task) {
$this->taskQueue->enqueue($task);
}
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$task->run();
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
}
newTask()方法(使用下一個空閒的任務id)創建一個新任務,然後把這個任務放入任務映射數組裡。接著它通過把任務放入任務隊列裡來實現 對任務的調度。接著run()方法掃描任務隊列,運行任務。如果一個任務結束了,那麼它將從隊列裡刪除,否則它將在隊列的末尾再次被調度。
讓我們看看下面具有兩個簡單(並且沒有什麼意義)任務的調度器:
function task1() {
for ($i = 1; $i <= 10; ++$i) {
echo "This is task 1 iteration $i.n";
yield;
}
}
function task2() {
for ($i = 1; $i <= 5; ++$i) {
echo "This is task 2 iteration $i.n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();
兩個任務都僅僅回顯一條信息,然後使用yield把控制回傳給調度器。輸出結果如下:
復制代碼 代碼如下:
輸出確實如我們所期望的:對前五個迭代來說,兩個任務是交替運行的,接著第二個任務結束後,只有第一個任務繼續運行。
與調度器之間通信
既然調度器已經運行了,那麼我們就轉向日程表的下一項:任務和調度器之間的通信。我們將使用進程用來和操作系統會話的同樣的方式來通信:系統調用。 我們需要系統調用的理由是操作系統與進程相比它處在不同的權限級別上。因此為了執行特權級別的操作(如殺死另一個進程),就不得不以某種方式把控制傳回給 內核,這樣內核就可以執行所說的操作了。再說一遍,這種行為在內部是通過使用中斷指令來實現的。過去使用的是通用的int指令,如今使用的是更特殊並且更 快速的syscall/sysenter指令。
我們的任務調度系統將反映這種設計:不是簡單地把調度器傳遞給任務(這樣久允許它做它想做的任何事),我們將通過給yield表達式傳遞信息來與系統調用通信。這兒yield即是中斷,也是傳遞信息給調度器(和從調度器傳遞出信息)的方法。
為了說明系統調用,我將對可調用的系統調用做一個小小的封裝:
class SystemCall {
protected $callback;
public function __construct(callable $callback) {
$this->callback = $callback;
}
public function __invoke(Task $task, Scheduler $scheduler) {
$callback = $this->callback; // Can't call it directly in PHP :/
return $callback($task, $scheduler);
}
}
它將像其他任何可調用那樣(使用_invoke)運行,不過它要求調度器把正在調用的任務和自身傳遞給這個函數。為了解決這個問題 我們不得不微微的修改調度器的run方法:
復制代碼 代碼如下: if ($retval instanceof SystemCall) {
$retval($task, $this);
continue;
}
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
第一個系統調用除了返回任務ID外什麼都沒有做:
復制代碼 代碼如下:這個函數確實設置任務id為下一次發送的值,並再次調度了這個任務。由於使用了系統調用,所以調度器不能自動調用任務,我們需要手工調度任務(稍後你將明白為什麼這麼做)。要使用這個新的系統調用的話,我們要重新編寫以前的例子:
復制代碼 代碼如下:function task($max) {
$tid = (yield getTaskId()); // <-- here's the syscall!
for ($i = 1; $i <= $max; ++$i) {
echo "This is task $tid iteration $i.n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task(10));
$scheduler->newTask(task(5));
$scheduler->run();
這段代碼將給出與前一個例子相同的輸出。注意系統調用同其他任何調用一樣正常地運行,不過預先增加了yield。要創建新的任務,然後再殺死它們的話,需要兩個以上的系統調用:
復制代碼 代碼如下:function newTask(Generator $coroutine) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($coroutine) {
$task->setSendValue($scheduler->newTask($coroutine));
$scheduler->schedule($task);
}
);
}
function killTask($tid) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($tid) {
$task->setSendValue($scheduler->killTask($tid));
$scheduler->schedule($task);
}
);
}
killTask函數需要在調度器裡增加一個方法:
復制代碼 代碼如下:public function killTask($tid) {
if (!isset($this->taskMap[$tid])) {
return false;
}
unset($this->taskMap[$tid]);
// This is a bit ugly and could be optimized so it does not have to walk the queue,
// but assuming that killing tasks is rather rare I won't bother with it now
foreach ($this->taskQueue as $i => $task) {
if ($task->getTaskId() === $tid) {
unset($this->taskQueue[$i]);
break;
}
}
return true;
}
用來測試新功能的微腳本:
復制代碼 代碼如下:function childTask() {
$tid = (yield getTaskId());
while (true) {
echo "Child task $tid still alive!n";
yield;
}
}
function task() {
$tid = (yield getTaskId());
$childTid = (yield newTask(childTask()));
for ($i = 1; $i <= 6; ++$i) {
echo "Parent task $tid iteration $i.n";
yield;
if ($i == 3) yield killTask($childTid);
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();
這段代碼將打印以下信息:
復制代碼 代碼如下:經過三次迭代以後子任務將被殺死,因此這就是"Child is still alive"消息結束的時候。可能應當指出的是這不是真正的父子關系。 因為甚至在父任務結束後子任務仍然可以運行。或者子任務可以殺死父任務。可以修改調度器使它具有更層級化的任務結構,不過 在這篇文章裡我沒有這麼做。
你可以實現許多進程管理調用。例如 wait(它一直等待到任務結束運行時),exec(它替代當前任務)和fork(它創建一個 當前任務的克隆)。fork非常酷,而且你可以使用PHP的協程真正地實現它,因為它們都支持克隆。
然而讓我們把這些留給有興趣的讀者吧,我們去看下一個議題。
幾點人
翻譯於 4天前
0人頂
頂 翻譯的不錯哦!
非阻塞IO
很明顯,我們的任務管理系統的真正很酷的應用是web服務器。它有一個任務是在套接字上偵聽是否有新連接,當有新連接要建立的時候 ,它創建一個新任務來處理新連接。
web服務器最難的部分通常是像讀數據這樣的套接字操作是阻塞的。例如PHP將等待到客戶端完成發送為止。對一個WEB服務器來說,這 根本不行;這就意味著服務器在一個時間點上只能處理一個連接。
解決方案是確保在真正對套接字讀寫之前該套接字已經“准備就緒”。為了查找哪個套接字已經准備好讀或者寫了,可以使用 流選擇函數。
首先,讓我們添加兩個新的 syscall,它們將等待直到指定 socket 准備好:
復制代碼 代碼如下:需要在某個地方注冊這個任務,例如,你可以在run()方法的開始增 加$this->newTask($this->ioPollTask())。然後就像其他 任務一樣每執行完整任務循環一次就執行輪詢操作一次(這麼做一定不是最好的方法)。ioPollTask將使用0秒的超時來調用ioPoll, 這意味著stream_select將立即返回(而不是等待)。
只有任務隊列為空時,我們才使用null超時,這意味著它一直等到某個套接口准備就緒。如果我們沒有這麼做,那麼輪詢任務將一而再, 再而三的循環運行,直到有新的連接建立。這將導致100%的CPU利用率。相反,讓操作系統做這種等待會更有效。
現在編寫服務器相對容易了:
這段代碼試圖把重復循環“輸出n次“的代碼嵌入到一個獨立的協程裡,然後從主任務裡調用它。然而它無法運行。正如在這篇文章的開始 所提到的,調用生成器(或者協程)將沒有真正地做任何事情,它僅僅返回一個對象。這也出現在上面的例子裡。echoTimes調用除了放回一個(無用的) 協程對象外不做任何事情。
為了仍然允許這麼做,我們需要在這個裸協程上寫一個小小的封裝。我們將調用它:“協程堆棧”。因為它將管理嵌套的協程調用堆棧。 這將是通過生成協程來調用子協程成為可能:
$retval = (yield someCoroutine($foo, $bar));
使用yield,子協程也能再次返回值:
yield retval("I'm a return value!");
retval函數除了返回一個值的封裝外沒有做任何其他事情。這個封裝將表示它是一個返回值。
復制代碼 代碼如下:為了把協程轉變為協程堆棧(它支持子調用),我們將不得不編寫另外一個函數(很明顯,它是另一個協程):
復制代碼 代碼如下: