PHP5.5一個比較好的新功能是實現對生成器和協同程序的支持。對於生成器,PHP的文檔和各種其他的博客文章(就像這一個或這一個)已經有了非常詳細的講解。協同程序相對受到的關注就少了,所以協同程序雖然有很強大的功能但也很難被知曉,解釋起來也比較困難。
這篇文章指導你通過使用協同程序來實施任務調度,通過實例實現對技術的理解。我將在前三節做一個簡單的背景介紹。如果你已經有了比較好的基礎,可以直接跳到“協同多任務處理”一節。
生成器
生成器最基本的思想也是一個函數,這個函數的返回值是依次輸出,而不是只返回一個單獨的值。或者,換句話說,生成器使你更方便的實現了迭代器接口。下面通過實現一個xrange函數來簡單說明:
復制代碼 代碼如下:
<?php
function xrange($start, $end, $step = 1) {
for ($i = $start; $i <= $end; $i += $step) {
yield $i;
}
}
foreach (xrange(1, 1000000) as $num) {
echo $num, "\n";
}
上面這個xrange()函數提供了和PHP的內建函數range()一樣的功能。但是不同的是range()函數返回的是一個包含屬組值從1到100萬的數組(注:請查看手冊)。而xrange()函數返回的是依次輸出這些值的一個迭代器,而且並不會真正以數組形式計算。
這種方法的優點是顯而易見的。它可以讓你在處理大數據集合的時候不用一次性的加載到內存中。甚至你可以處理無限大的數據流。
當然,也可以不同通過生成器來實現這個功能,而是可以通過繼承Iterator接口實現。通過使用生成器實現起來會更方便,而不用再去實現iterator接口中的5個方法了。
生成器為可中斷的函數
要從生成器認識協同程序,理解它們內部是如何工作的非常重要:生成器是可中斷的函數,在它裡面,yield構成了中斷點。
緊接著上面的例子,如果你調用xrange(1,1000000)的話,xrange()函數裡代碼沒有真正地運行。相反,PHP只是返回了一個實現了迭代器接口的 生成器類實例:
復制代碼 代碼如下:
<?php
$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true)
你對某個對象調用迭代器方法一次,其中的代碼運行一次。例如,如果你調用$range->rewind(),那麼xrange()裡的代碼運行到控制流 第一次出現yield的地方。在這種情況下,這就意味著當$i=$start時yield $i才運行。傳遞給yield語句的值是使用$range->current()獲取的。
為了繼續執行生成器中的代碼,你必須調用$range->next()方法。這將再次啟動生成器,直到yield語句出現。因此,連續調用next()和current()方法 你將能從生成器裡獲得所有的值,直到某個點沒有再出現yield語句。對xrange()來說,這種情形出現在$i超過$end時。在這中情況下, 控制流將到達函數的終點,因此將不執行任何代碼。一旦這種情況發生,vaild()方法將返回假,這時迭代結束。
協程
協程給上面功能添加的主要東西是回送數據給生成器的能力。這將把生成器到調用者的單向通信轉變為兩者之間的雙向通信。
通過調用生成器的send()方法而不是其next()方法傳遞數據給協程。下面的logger()協程是這種通信如何運行的例子:
復制代碼 代碼如下:
<?php
function logger($fileName) {
$fileHandle = fopen($fileName, 'a');
while (true) {
fwrite($fileHandle, yield . "\n");
}
}
$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar')
正如你能看到,這兒yield沒有作為一個語句來使用,而是用作一個表達式。即它有一個返回值。yield的返回值是傳遞給send()方法的值。 在這個例子裡,yield將首先返回"Foo",然後返回"Bar"。
上面的例子裡yield僅作為接收者。混合兩種用法是可能的,即既可接收也可發送。接收和發送通信如何進行的例子如下:
復制代碼 代碼如下:
<?php
function gen() {
$ret = (yield 'yield1');
var_dump($ret);
$ret = (yield 'yield2');
var_dump($ret);
}
$gen = gen();
var_dump($gen->current()); // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen)
// string(6) "yield2" (the var_dump of the ->send() return value)
var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen)
// NULL (the return value of ->send())
馬上理解輸出的精確順序有點困難,因此確定你知道為什按照這種方式輸出。我願意特別指出的有兩點:第一點,yield表達式兩邊使用 圓括號不是偶然。由於技術原因(雖然我已經考慮為賦值增加一個異常,就像Python那樣),圓括號是必須的。第二點,你可能已經注意到 調用current()之前沒有調用rewind()。如果是這麼做的,那麼已經隱含地執行了rewind操作。
多任務協作
如果閱讀了上面的logger()例子,那麼你認為“為了雙向通信我為什麼要使用協程呢? 為什麼我不能只用常見的類呢?”,你這麼問完全正確。上面的例子演示了基本用法,然而上下文中沒有真正的展示出使用協程的優點。這就是列舉許多協程例子的理由。正如上面介紹裡提到的,協程是非常強大的概念,不過這樣的應用很稀少而且常常十分復雜。給出一些簡單而真實的例子很難。
在這篇文章裡,我決定去做的是使用協程實現多任務協作。我們盡力解決的問題是你想並發地運行多任務(或者“程序”)。不過處理器在一個時刻只能運行一個任務(這篇文章的目標是不考慮多核的)。因此處理器需要在不同的任務之間進行切換,而且總是讓每個任務運行 “一小會兒”。
多任務協作這個術語中的“協作”說明了如何進行這種切換的:它要求當前正在運行的任務自動把控制傳回給調度器,這樣它就可以運行其他任務了。這與“搶占”多任務相反,搶占多任務是這樣的:調度器可以中斷運行了一段時間的任務,不管它喜歡還是不喜歡。協作多任務在Windows的早期版本(windows95)和Mac OS中有使用,不過它們後來都切換到使用搶先多任務了。理由相當明確:如果你依靠程序自動傳回 控制的話,那麼壞行為的軟件將很容易為自身占用整個CPU,不與其他任務共享。
這個時候你應當明白協程和任務調度之間的聯系:yield指令提供了任務中斷自身的一種方法,然後把控制傳遞給調度器。因此協程可以運行多個其他任務。更進一步來說,yield可以用來在任務和調度器之間進行通信。
我們的目的是 對 “任務”用更輕量級的包裝的協程函數:
復制代碼 代碼如下:
<?php
class Task {
protected $taskId;
protected $coroutine;
protected $sendValue = null;
protected $beforeFirstYield = true;
public function __construct($taskId, Generator $coroutine) {
$this->taskId = $taskId;
$this->coroutine = $coroutine;
}
public function getTaskId() {
return $this->taskId;
}
public function setSendValue($sendValue) {
$this->sendValue = $sendValue;
}
public function run() {
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}
public function isFinished() {
return !$this->coroutine->valid();
}
}
一個任務是用 任務ID標記一個協程。使用setSendValue()方法,你可以指定哪些值將被發送到下次的恢復(在之後你會了解到我們需要這個)。 run()函數確實沒有做什麼,除了調用send()方法的協同程序。要理解為什麼添加beforeFirstYieldflag,需要考慮下面的代碼片段:
復制代碼 代碼如下:
<?php
function gen() {
yield 'foo';
yield 'bar';
}
$gen = gen();
var_dump($gen->send('something'));
// As the send() happens before the first yield there is an implicit rewind() call,
// so what really happens is this:
$gen->rewind();
var_dump($gen->send('something'));
// The rewind() will advance to the first yield (and ignore its value), the send() will
// advance to the second yield (and dump its value). Thus we loose the first yielded value!
通過添加 beforeFirstYieldcondition 我們可以確定 first yield 的值 被返回。
調度器現在不得不比多任務循環要做稍微多點了,然後才運行多任務:
復制代碼 代碼如下:
<?php
class Scheduler {
protected $maxTaskId = 0;
protected $taskMap = []; // taskId => task
protected $taskQueue;
public function __construct() {
$this->taskQueue = new SplQueue();
}
public function newTask(Generator $coroutine) {
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->taskMap[$tid] = $task;
$this->schedule($task);
return $tid;
}
public function schedule(Task $task) {
$this->taskQueue->enqueue($task);
}
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$task->run();
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
}
newTask()方法(使用下一個空閒的任務id)創建一個新任務,然後把這個任務放入任務映射數組裡。接著它通過把任務放入任務隊列裡來實現對任務的調度。接著run()方法掃描任務隊列,運行任務。如果一個任務結束了,那麼它將從隊列裡刪除,否則它將在隊列的末尾再次被調度。
讓我們看看下面具有兩個簡單(並且沒有什麼意義)任務的調度器:
復制代碼 代碼如下:
<?php
function task1() {
for ($i = 1; $i <= 10; ++$i) {
echo "This is task 1 iteration $i.\n";
yield;
}
}
function task2() {
for ($i = 1; $i <= 5; ++$i) {
echo "This is task 2 iteration $i.\n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();
兩個任務都僅僅回顯一條信息,然後使用yield把控制回傳給調度器。輸出結果如下:
復制代碼 代碼如下:
This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.
輸出確實如我們所期望的:對前五個迭代來說,兩個任務是交替運行的,接著第二個任務結束後,只有第一個任務繼續運行。
與調度器之間通信
既然調度器已經運行了,那麼我們就轉向日程表的下一項:任務和調度器之間的通信。我們將使用進程用來和操作系統會話的同樣的方式來通信:系統調用。我們需要系統調用的理由是操作系統與進程相比它處在不同的權限級別上。因此為了執行特權級別的操作(如殺死另一個進程),就不得不以某種方式把控制傳回給內核,這樣內核就可以執行所說的操作了。再說一遍,這種行為在內部是通過使用中斷指令來實現的。過去使用的是通用的int指令,如今使用的是更特殊並且更快速的syscall/sysenter指令。
我們的任務調度系統將反映這種設計:不是簡單地把調度器傳遞給任務(這樣久允許它做它想做的任何事),我們將通過給yield表達式傳遞信息來與系統調用通信。這兒yield即是中斷,也是傳遞信息給調度器(和從調度器傳遞出信息)的方法。
為了說明系統調用,我將對可調用的系統調用做一個小小的封裝:
復制代碼 代碼如下:
<?php
class SystemCall {
protected $callback;
public function __construct(callable $callback) {
$this->callback = $callback;
}
public function __invoke(Task $task, Scheduler $scheduler) {
$callback = $this->callback; // Can't call it directly in PHP :/
return $callback($task, $scheduler);
}
}
它將像其他任何可調用那樣(使用_invoke)運行,不過它要求調度器把正在調用的任務和自身傳遞給這個函數。為了解決這個問題 我們不得不微微的修改調度器的run方法:
復制代碼 代碼如下:
<?php
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$retval = $task->run();
if ($retval instanceof SystemCall) {
$retval($task, $this);
continue;
}
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
第一個系統調用除了返回任務ID外什麼都沒有做:
復制代碼 代碼如下:
<?php
function getTaskId() {
return new SystemCall(function(Task $task, Scheduler $scheduler) {
$task->setSendValue($task->getTaskId());
$scheduler->schedule($task);
});
}
這個函數確實設置任務id為下一次發送的值,並再次調度了這個任務。由於使用了系統調用,所以調度器不能自動調用任務,我們需要手工調度任務(稍後你將明白為什麼這麼做)。要使用這個新的系統調用的話,我們要重新編寫以前的例子:
復制代碼 代碼如下:
<?php
function task($max) {
$tid = (yield getTaskId()); // <-- here's the syscall!
for ($i = 1; $i <= $max; ++$i) {
echo "This is task $tid iteration $i.\n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task(10));
$scheduler->newTask(task(5));
$scheduler->run();
這段代碼將給出與前一個例子相同的輸出。注意系統調用同其他任何調用一樣正常地運行,不過預先增加了yield。要創建新的任務,然後再殺死它們的話,需要兩個以上的系統調用:
復制代碼 代碼如下:
<?php
function newTask(Generator $coroutine) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($coroutine) {
$task->setSendValue($scheduler->newTask($coroutine));
$scheduler->schedule($task);
}
);
}
function killTask($tid) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($tid) {
$task->setSendValue($scheduler->killTask($tid));
$scheduler->schedule($task);
}
);
}
killTask函數需要在調度器裡增加一個方法:
復制代碼 代碼如下:
<?php
public function killTask($tid) {
if (!isset($this->taskMap[$tid])) {
return false;
}
unset($this->taskMap[$tid]);
// This is a bit ugly and could be optimized so it does not have to walk the queue,
// but assuming that killing tasks is rather rare I won't bother with it now
foreach ($this->taskQueue as $i => $task) {
if ($task->getTaskId() === $tid) {
unset($this->taskQueue[$i]);
break;
}
}
return true;
}
當前1/2頁 12下一頁閱讀全文