程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 網頁編程 >> PHP編程 >> 關於PHP編程 >> PHP memcache實現消息隊列實例

PHP memcache實現消息隊列實例

編輯:關於PHP編程

現在memcache在服務器緩存應用比較廣泛,下面我來介紹memcache實現消息隊列等待的一個例子,有需要了解的朋友可參考。

memche消息隊列的原理就是在key上做文章,用以做一個連續的數字加上前綴記錄序列化以後消息或者日志。然後通過定時程序將內容落地到文件或者數據庫。


php實現消息隊列的用處比如在做發送郵件時發送大量郵件很費時間的問題,那麼可以采取隊列。
方便實現隊列的輕量級隊列服務器是:
starling支持memcache協議的輕量級持久化服務器
https://github.com/starling/starling
Beanstalkd輕量、高效,支持持久化,每秒可處理3000左右的隊列
http://kr.github.com/beanstalkd/
php中也可以使用memcache/memcached來實現消息隊列。

 代碼如下 復制代碼

<?php
/**
* Memcache 消息隊列類
*/

class QMC {
const PREFIX = 'ASDFASDFFWQKE';

/**
* 初始化mc
* @staticvar string $mc
* @return Memcache
*/
static private function mc_init() {
static $mc = null;
if (is_null($mc)) {
$mc = new Memcache;
$mc->connect('127.0.0.1', 11211);
}
return $mc;
}
/**
* mc 計數器,增加計數並返回新的計數
* @param string $key   計數器
* @param int $offset   計數增量,可為負數.0為不改變計數
* @param int $time     時間
* @return int/false    失敗是返回false,成功時返回更新計數器後的計數
*/
static public function set_counter( $key, $offset, $time=0 ){
$mc = self::mc_init();
$val = $mc->get($key);
if( !is_numeric($val) || $val < 0 ){
$ret = $mc->set( $key, 0, $time );
if( !$ret ) return false;
$val = 0;
}
$offset = intval( $offset );
if( $offset > 0 ){
return $mc->increment( $key, $offset );
}elseif( $offset < 0 ){
return $mc->decrement( $key, -$offset );
}
return $val;
}

/**
* 寫入隊列
* @param string $key
* @param mixed $value
* @return bool
*/
static public function input( $key, $value ){
$mc = self::mc_init();
$w_key = self::PREFIX.$key.'W';
$v_key = self::PREFIX.$key.self::set_counter($w_key, 1);
return $mc->set( $v_key, $value );
}
/**
* 讀取隊列裡的數據
* @param string $key
* @param int $max  最多讀取條數
* @return array
*/
static public function output( $key, $max=100 ){
$out = array();
$mc = self::mc_init();
$r_key = self::PREFIX.$key.'R';
$w_key = self::PREFIX.$key.'W';
$r_p   = self::set_counter( $r_key, 0 );//讀指針
$w_p   = self::set_counter( $w_key, 0 );//寫指針
if( $r_p == 0 ) $r_p = 1;
while( $w_p >= $r_p ){
if( --$max < 0 ) break;
$v_key = self::PREFIX.$key.$r_p;
$r_p = self::set_counter( $r_key, 1 );
$out[] = $mc->get( $v_key );
$mc->delete($v_key);
}
return $out;
}
}
/**
使用方法:
QMC::input($key, $value );//寫入隊列
$list = QMC::output($key);//讀取隊列
*/
?>


基於PHP共享內存實現的消息隊列:

 代碼如下 復制代碼

<?php
/**
* 使用共享內存的PHP循環內存隊列實現
* 支持多進程, 支持各種數據類型的存儲
* 注: 完成入隊或出隊操作,盡快使用unset(), 以釋放臨界區
*
* @author [email protected]
* @created 2009-12-23
*/
class ShmQueue
{
private $maxQSize = 0; // 隊列最大長度

private $front = 0; // 隊頭指針
private $rear = 0;  // 隊尾指針

private $blockSize = 256;  // 塊的大小(byte)
private $memSize = 25600;  // 最大共享內存(byte)
private $shmId = 0;

private $filePtr = './shmq.ptr';

private $semId = 0;
public function __construct()
{
$shmkey = ftok(__FILE__, 't');

$this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize );
$this->maxQSize = $this->memSize / $this->blockSize;

// 申?一個信號量
$this->semId = sem_get($shmkey, 1);
sem_acquire($this->semId); // 申請進入臨界區

$this->init();
}

private function init()
{
if ( file_exists($this->filePtr) ){
$contents = file_get_contents($this->filePtr);
$data = explode( '|', $contents );
if ( isset($data[0]) && isset($data[1])){
$this->front = (int)$data[0];
$this->rear  = (int)$data[1];
}
}
}

public function getLength()
{
return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;
}

public function enQueue( $value )
{
if ( $this->ptrInc($this->rear) == $this->front ){ // 隊滿
return false;
}

$data = $this->encode($value);
shmop_write($this->shmId, $data, $this->rear );
$this->rear = $this->ptrInc($this->rear);
return true;
}

public function deQueue()
{
if ( $this->front == $this->rear ){ // 隊空
return false;
}
$value = shmop_read($this->shmId, $this->front, $this->blockSize-1);
$this->front = $this->ptrInc($this->front);
return $this->decode($value);
}

private function ptrInc( $ptr )
{
return ($ptr + $this->blockSize) % ($this->memSize);
}

private function encode( $value )
{
$data = serialize($value) . "__eof";
echo '';

echo strlen($data);
echo '';

echo $this->blockSize -1;
echo '';

if ( strlen($data) > $this->blockSize -1 ){
throw new Exception(strlen($data)." is overload block size!");
}
return $data;
}

private function decode( $value )
{
$data = explode("__eof", $value);
return unserialize($data[0]);
}

public function __destruct()
{
$data = $this->front . '|' . $this->rear;
file_put_contents($this->filePtr, $data);

sem_release($this->semId); // 出臨界區, 釋放信號量
}
}

/*
// 進隊操作
$shmq = new ShmQueue();
$data = 'test data';
$shmq->enQueue($data);
unset($shmq);
// 出隊操作
$shmq = new ShmQueue();
$data = $shmq->deQueue();
unset($shmq);
*/
?>

對於一個很大的消息隊列,頻繁進行進行大數據庫的序列化 和 反序列化,有太耗費。下面是我用PHP 實現的一個消息隊列,只需要在尾部插入一個數據,就操作尾部,不用操作整個消息隊列進行讀取,與操作。但是,這個消息隊列不是線程安全的,我只是盡量的避免了沖突的可能性。如果消息不是非常的密集,比如幾秒鐘才一個,還是可以考慮這樣使用的。
如果你要實現線程安全的,一個建議是通過文件進行鎖定,然後進行操作。下面是代碼:
代碼如下:

 代碼如下 復制代碼 class Memcache_Queue
{
private $memcache;
private $name;
private $prefix;
function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__")
{
if ($memcache == null) {
throw new Exception("memcache object is null, new the object first.");
}
$this->memcache = $memcache;
$this->name = $name;
$this->prefix = $prefix;
$this->maxSize = $maxSize;
$this->front = 0;
$this->real = 0;
$this->size = 0;
}
function __get($name)
{
return $this->get($name);
}
function __set($name, $value)
{
$this->add($name, $value);
return $this;
}
function isEmpty()
{
return $this->size == 0;
}
function isFull()
{
return $this->size == $this->maxSize;
}
function enQueue($data)
{
if ($this->isFull()) {
throw new Exception("Queue is Full");
}
$this->increment("size");
$this->set($this->real, $data);
$this->set("real", ($this->real + 1) % $this->maxSize);
return $this;
}
function deQueue()
{
if ($this->isEmpty()) {
throw new Exception("Queue is Empty");
}
$this->decrement("size");
$this->delete($this->front);
$this->set("front", ($this->front + 1) % $this->maxSize);
return $this;
}
function getTop()
{
return $this->get($this->front);
}
function getAll()
{
return $this->getPage();
}
function getPage($offset = 0, $limit = 0)
{
if ($this->isEmpty() || $this->size < $offset) {
return null;
}
$keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize);
$num = 1;
for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)
{
$keys[] = $this->getKeyByPos($pos);
$num++;
if ($limit > 0 && $limit == $num) {
break;
}
}
return array_values($this->memcache->get($keys));
}
function makeEmpty()
{
$keys = $this->getAllKeys();
foreach ($keys as $value) {
$this->delete($value);
}
$this->delete("real");
$this->delete("front");
$this->delete("size");
$this->delete("maxSize");
}
private function getAllKeys()
{
if ($this->isEmpty())
{
return array();
}
$keys[] = $this->getKeyByPos($this->front);
for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)
{
$keys[] = $this->getKeyByPos($pos);
}
return $keys;
}
private function add($pos, $data)
{
$this->memcache->add($this->getKeyByPos($pos), $data);
return $this;
}
private function increment($pos)
{
return $this->memcache->increment($this->getKeyByPos($pos));
}
private function decrement($pos)
{
$this->memcache->decrement($this->getKeyByPos($pos));
}
private function set($pos, $data)
{
$this->memcache->set($this->getKeyByPos($pos), $data);
return $this;
}
private function get($pos)
{
return $this->memcache->get($this->getKeyByPos($pos));
}
private function delete($pos)
{
return $this->memcache->delete($this->getKeyByPos($pos));
}
private function getKeyByPos($pos)
{
return $this->prefix . $this->name . $pos;
}
}

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved