php类代码: <div class="codetitle"><a style="CURSOR: pointer" data="31644" class="copybut" id="copybut31644" onclick="doCopy('code31644')"> 代码如下:<div class="codebody" id="code31644"> <?php class MQ{ public static $client; private static $m_real; private static $m_front; private static $m_data = array(); const QUEUE_MAX_NUM = 100000000; const QUEUE_FRONT_KEY = '_queue_item_front'; const QUEUE_REAL_KEY = '_queue_item_real'; public static function setupMq($conf) { self::$client = memcache_pconnect($conf); self::$m_real = memcache_get(self::$client,self::QUEUE_REAL_KEY); self::$m_front = memcache_get(self::$client,self::QUEUE_FRONT_KEY); if (!isset(self::$m_real) || emptyempty(self::$m_real)) { self::$real= 0; } if (!isset(self::$m_front) || emptyempty(self::$m_front)) { self::$m_front = 0; } return self::$client; } public static function add($queue,$data) { $result = false; if (self::$m_real < self::QUEUE_MAX_NUM) { if (memcache_add(self::$client,$queue.self::$m_real,$data)) { self::mqRealChange(); $result = true; } } return $result; } public static function get($key,$count) { $num = 0; for ($i=self::$m_front;$i<self::$m_front + $count;$i++) { if ($dataTmp = memcache_get(self::$client,$key.$i)) { self::$m_data[] = $dataTmp; memcache_delete(self::$client,$key.$i); $num++; } } if ($num>0) { self::mqFrontChange($num); } return self::$m_data; } private static function mqRealChange() { memcache_add(self::$client,self::QUEUE_REAL_KEY,0); self::$m_real = memcache_increment(self::$client,1); } private static function mqFrontChange($num) { memcache_add(self::$client,self::QUEUE_FRONT_KEY,0); self::$m_front = memcache_increment(self::$client,$num); } public static function mflush($memcache_obj) { memcache_flush($memcache_obj); } public static function Debug() { echo 'real:'.self::$m_real." /r/n"; echo 'front:'.self::$m_front." /r/n"; echo 'wait for process data:'.intval(self::$m_real - self::$m_front); echo " /r/n"; echo ''; print_r(self::$m_data); echo ''; } } define('FLUSH_MQ',0);//CLEAN ALL DATA define('IS_ADD',0);//SET DATA $mobj = MQ::setupMq('127.0.0.1','11211'); if (FLUSH_MQ) { MQ::mflush($mobj); } else { if (IS_ADD) { MQ::add('user_sync','1test'); MQ::add('user_sync','2test'); MQ::add('user_sync','3test'); MQ::add('user_sync','4test'); MQ::add('user_sync','5test'); MQ::add('user_sync','6test'); } else { MQ::get('user_sync',10); } } MQ::Debug(); ?> 使用方法 <div class="codetitle"><a style="CURSOR: pointer" data="23358" class="copybut" id="copybut23358" onclick="doCopy('code23358')"> 代码如下:<div class="codebody" id="code23358"> MQ::setupMq('127.0.0.1','11211');//连接 MQ::add($key,$value);//添加数据到队列 MQ::add($key,$value);//添加数据到队列 MQ:get($key,10);//取出一定数量的数据
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|