安裝Pthreads 基本上需要重新編譯PHP,加上 --enable-maintainer-zts 參數,但是用這個文檔很少;bug會很多很有很多意想不到的問題,生成環境上只能呵呵了,所以這個東西玩玩就算了,真正多線程還是用Python、C等等
以下代碼大部分來自網絡
這裡使用的是 php-7.0.2
./configure \ --prefix=/usr/local/php7 \ --with-config-file-path=/etc \ --with-config-file-scan-dir=/etc/php.d \ --enable-debug \ --enable-maintainer-zts \ --enable-pcntl \ --enable-fpm \ --enable-opcache \ --enable-embed=shared \ --enable-json=shared \ --enable-phpdbg \ --with-curl=shared \ --with-mysql=/usr/local/mysql \ --with-mysqli=/usr/local/mysql/bin/mysql_config \ --with-pdo-mysql make && make install
安裝pthreads
pecl install pthreads
<?php #1 $thread = new class extends Thread { public function run() { echo "Hello World {$this->getThreadId()}\n"; } }; $thread->start() && $thread->join(); #2 class workerThread extends Thread { public function __construct($i){ $this->i=$i; } public function run(){ while(true){ echo $this->i."\n"; sleep(1); } } } for($i=0;$i<50;$i++){ $workers[$i]=new workerThread($i); $workers[$i]->start(); } ?>
Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.
<?php class SQLQuery extends Stackable { public function __construct($sql) { $this->sql = $sql; } public function run() { $dbh = $this->worker->getConnection(); $row = $dbh->query($this->sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } public function run(){ self::$dbh = new PDO('mysql:host=10.0.0.30;dbname=testdb','root','123456'); } public function getConnection(){ return self::$dbh; } } $worker = new ExampleWorker("My Worker Thread"); $sql1 = new SQLQuery('select * from test order by id desc limit 1,5'); $worker->stack($sql1); $sql2 = new SQLQuery('select * from test order by id desc limit 5,5'); $worker->stack($sql2); $worker->start(); $worker->shutdown(); ?>
什麼情況下會用到互斥鎖?在你需要控制多個線程同一時刻只能有一個線程工作的情況下可以使用。一個簡單的計數器程序,說明有無互斥鎖情況下的不同
<?php $counter = 0; $handle=fopen("/tmp/counter.txt", "w"); fwrite($handle, $counter ); fclose($handle); class CounterThread extends Thread { public function __construct($mutex = null){ $this->mutex = $mutex; $this->handle = fopen("/tmp/counter.txt", "w+"); } public function __destruct(){ fclose($this->handle); } public function run() { if($this->mutex) $locked=Mutex::lock($this->mutex); $counter = intval(fgets($this->handle)); $counter++; rewind($this->handle); fputs($this->handle, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); if($this->mutex) Mutex::unlock($this->mutex); } } //沒有互斥鎖 for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread(); $threads[$i]->start(); } //加入互斥鎖 $mutex = Mutex::create(true); for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread($mutex); $threads[$i]->start(); } Mutex::unlock($mutex); for ($i=0;$i<50;$i++){ $threads[$i]->join(); } Mutex::destroy($mutex); ?>
在共享內存的例子中,沒有使用任何鎖,仍然可能正常工作,可能工作內存操作本身具備鎖的功能
<?php $tmp = tempnam(__FILE__, 'PHP'); $key = ftok($tmp, 'a'); $shmid = shm_attach($key); $counter = 0; shm_put_var( $shmid, 1, $counter ); class CounterThread extends Thread { public function __construct($shmid){ $this->shmid = $shmid; } public function run() { $counter = shm_get_var( $this->shmid, 1 ); $counter++; shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); } } for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid); } for ($i=0;$i<100;$i++){ $threads[$i]->start(); } for ($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove( $shmid ); shm_detach( $shmid ); ?>
有些場景我們不希望 thread->start() 就開始運行程序,而是希望線程等待我們的命令。$thread->wait();測作用是 thread->start()後線程並不會立即運行,只有收到 $thread->notify(); 發出的信號後才運行
<?php $tmp = tempnam(__FILE__, 'PHP'); $key = ftok($tmp, 'a'); $shmid = shm_attach($key); $counter = 0; shm_put_var( $shmid, 1, $counter ); class CounterThread extends Thread { public function __construct($shmid){ $this->shmid = $shmid; } public function run() { $this->synchronized(function($thread){ $thread->wait(); }, $this); $counter = shm_get_var( $this->shmid, 1 ); $counter++; shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); } } for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid); } for ($i=0;$i<100;$i++){ $threads[$i]->start(); } for ($i=0;$i<100;$i++){ $threads[$i]->synchronized(function($thread){ $thread->notify(); }, $threads[$i]); } for ($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove( $shmid ); shm_detach( $shmid ); ?>
一個Pool類
<?php class Update extends Thread { public $running = false; public $row = array(); public function __construct($row) { $this->row = $row; $this->sql = null; } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); } } class Pool { public $pool = array(); public function __construct($count) { $this->count = $count; } public function push($row){ if(count($this->pool) < $this->count){ $this->pool[] = new Update($row); return true; }else{ return false; } } public function start(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->start(); } } public function join(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->join(); } } public function clean(){ foreach ( $this->pool as $id => $worker){ if(! $worker->isRunning()){ unset($this->pool[$id]); } } } } try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc"; $row = $dbh->query($sql); $pool = new Pool(5); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while(true){ if($pool->push($member)){ //壓入任務到池中 break; }else{ //如果池已經滿,就開始啟動線程 $pool->start(); $pool->join(); $pool->clean(); } } } $pool->start(); $pool->join(); $dbh = null; } catch (Exception $e) { echo '[' , date('H:i:s') , ']', '系統錯誤', $e->getMessage(), "\n"; } ?>
上面的例子是當線程池滿後執行start統一啟動,下面的例子是只要線程池中有空閒便立即創建新線程。
<?php class Update extends Thread { public $running = false; public $row = array(); public function __construct($row) { $this->row = $row; $this->sql = null; //print_r($this->row); } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); } } try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc limit 50"; $row = $dbh->query($sql); $pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { $id = $member['id']; while (true){ if(count($pool) < 5){ $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ if(! $worker->isRunning()){ unset($pool[$name]); } } } } } $dbh = null; } catch (Exception $e) { echo '【' , date('H:i:s') , '】', '【系統錯誤】', $e->getMessage(), "\n"; } ?>
<?php class WebWorker extends Worker { public function __construct(SafeLog $logger) { $this->logger = $logger; } protected $loger; } class WebWork extends Stackable { public function isComplete() { return $this->complete; } public function run() { $this->worker ->logger ->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId()); $this->complete = true; } protected $complete; } class SafeLog extends Stackable { protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf( "{$message}\n", $args); } } } $pool = new Pool(8, \WebWorker::class, [new SafeLog()]); $pool->submit($w=new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->submit(new WebWork()); $pool->shutdown(); $pool->collect(function($work){ return $work->isComplete(); }); var_dump($pool);
LOCK_SH 取得共享鎖定(讀取的程序)
LOCK_EX 取得獨占鎖定(寫入的程序
LOCK_UN 釋放鎖定(無論共享或獨占)
LOCK_NB 如果不希望 flock() 在鎖定時堵塞
<?php $fp = fopen("/tmp/lock.txt", "r+"); if (flock($fp, LOCK_EX)) { // 進行排它型鎖定 ftruncate($fp, 0); // truncate file fwrite($fp, "Write something here\n"); fflush($fp); // flush output before releasing the lock flock($fp, LOCK_UN); // 釋放鎖定 } else { echo "Couldn't get the lock!"; } fclose($fp); $fp = fopen('/tmp/lock.txt', 'r+'); if(!flock($fp, LOCK_EX | LOCK_NB)) { echo 'Unable to obtain lock'; exit(-1); } fclose($fp); ?>
pthreads 與 pdo 同時使用是,需要注意一點,需要靜態聲明public static $dbh;並且通過單例模式訪問數據庫連接。
<?php class Work extends Stackable { public function __construct() { } public function run() { $dbh = $this->worker->getConnection(); $sql = "select id,name from members order by id desc limit 50"; $row = $dbh->query($sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } /* * The run method should just prepare the environment for the work that is coming ... */ public function run(){ self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456'); } public function getConnection(){ return self::$dbh; } } $worker = new ExampleWorker("My Worker Thread"); $work=new Work(); $worker->stack($work); $worker->start(); $worker->shutdown(); ?>
在線程池中鏈接數據庫
# cat pool.php <?php class ExampleWorker extends Worker { public function __construct(Logging $logger) { $this->logger = $logger; } protected $logger; } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($number) { $this->number = $number; } public function run() { $dbhost = 'db.example.com'; // 數據庫服務器 $dbuser = 'example.com'; // 數據庫用戶名 $dbpw = 'password'; // 數據庫密碼 $dbname = 'example_real'; $dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); $sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'"; #echo $sql; $row = $dbh->query($sql); $mt4_trades = $row->fetch(PDO::FETCH_ASSOC); if($mt4_trades){ $row = null; $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';"; $dbh->query($sql); #printf("%s\n",$sql); } $dbh = null; printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']); } } class Logging extends Stackable { protected static $dbh; public function __construct() { $dbhost = 'db.example.com'; // 數據庫服務器 $dbuser = 'example.com'; // 數據庫用戶名 $dbpw = 'password'; // 數據庫密碼 $dbname = 'example_real'; // 數據庫名 self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); } protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf("{$message}\n", $args); } } protected function getConnection(){ return self::$dbh; } } $pool = new Pool(200, \ExampleWorker::class, [new Logging()]); $dbhost = 'db.example.com'; // 數據庫服務器 $dbuser = 'example.com'; // 數據庫用戶名 $dbpw = 'password'; // 數據庫密碼 $dbname = 'db_example'; $dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select `order`,name from accounts where deposit_time is null order by id desc"; $row = $dbh->query($sql); while($account = $row->fetch(PDO::FETCH_ASSOC)) { $pool->submit(new Work($account)); } $pool->shutdown(); ?>
進一步改進上面程序,我們使用單例模式 $this->worker->getInstance(); 全局僅僅做一次數據庫連接,線程使用共享的數據庫連接
<?php class ExampleWorker extends Worker { #public function __construct(Logging $logger) { # $this->logger = $logger; #} #protected $logger; protected static $dbh; public function __construct() { } public function run(){ $dbhost = 'db.example.com'; // 數據庫服務器 $dbuser = 'example.com'; // 數據庫用戶名 $dbpw = 'password'; // 數據庫密碼 $dbname = 'example'; // 數據庫名 self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); } protected function getInstance(){ return self::$dbh; } } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($data) { $this->data = $data; #print_r($data); } public function run() { #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() ); try { $dbh = $this->worker->getInstance(); #print_r($dbh); $id = $this->data['id']; $mobile = safenet_decrypt($this->data['mobile']); #printf("%d, %s \n", $id, $mobile); if(strlen($mobile) > 11){ $mobile = substr($mobile, -11); } if($mobile == 'null'){ # $sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'"; # printf("%s\n",$sql); # $dbh->query($sql); $mobile = ''; $sql = "UPDATE members_digest SET mobile = :mobile where id = :id"; }else{ $sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id"; } $sth = $dbh->prepare($sql); $sth->bindValue(':mobile', $mobile); $sth->bindValue(':id', $id); $sth->execute(); #echo $sth->debugDumpParams(); } catch(PDOException $e) { $error = sprintf("%s,%s\n", $mobile, $id ); file_put_contents("mobile_error.log", $error, FILE_APPEND); } #$dbh = null; printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id); #printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number); } } $pool = new Pool(100, \ExampleWorker::class, []); #foreach (range(0, 100) as $number) { # $pool->submit(new Work($number)); #} $dbhost = 'db.example.com'; // 數據庫服務器 $dbuser = 'example.com'; // 數據庫用戶名 $dbpw = 'password'; // 數據庫密碼 $dbname = 'example'; $dbh = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); #print_r($dbh); #$sql = "select id, mobile from members where id < :id"; #$sth = $dbh->prepare($sql); #$sth->bindValue(':id',300); #$sth->execute(); #$result = $sth->fetchAll(); #print_r($result); # #$sql = "UPDATE members_digest SET mobile = :mobile where id = :id"; #$sth = $dbh->prepare($sql); #$sth->bindValue(':mobile', 'aa'); #$sth->bindValue(':id','272'); #echo $sth->execute(); #echo $sth->queryString; #echo $sth->debugDumpParams(); $sql = "select id, mobile from members order by id asc"; // limit 1000"; $row = $dbh->query($sql); while($members = $row->fetch(PDO::FETCH_ASSOC)) { #$order = $account['order']; #printf("%s\n",$order); //print_r($members); $pool->submit(new Work($members)); #unset($account['order']); } $pool->shutdown(); ?>
總的來說 pthreads 仍然處在發展中,仍有一些不足的地方,我們也可以看到pthreads的git在不斷改進這個項目
數據庫持久鏈接很重要,否則每個線程都會開啟一次數據庫連接,然後關閉,會導致很多鏈接超時。
<?php $dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array( PDO::ATTR_PERSISTENT => true )); ?>