文字

Cond 类

(PECL pthreads >= 2.0.0)

简介

Cond 类提供一组用来直接访问 Posix 条件变量的静态方法。

类摘要

Cond {
final public static boolean broadcast ( long $condition )
final public static long create ( void )
final public static boolean destroy ( long $condition )
final public static boolean signal ( long $condition )
final public static boolean wait ( long $condition , long $mutex [, long $timeout ] )
}

Table of Contents

  • Cond::broadcast — 广播条件变量
  • Cond::create — 创建一个条件变量
  • Cond::destroy — 销毁条件变量
  • Cond::signal — 发送唤醒信号
  • Cond::wait — 等待

用户评论:

[#1] jan [2013-05-25 13:11:45]

As Cond (nor Mutex) is good for queuing tasks, and arrays/objects stored a "serialized" (can't use it as excepted for signals outside the run()) here is a simple task queue trait:
<?php
define
('T',1000000);
trait 
TaskQueue{
    private 
$queue_len 0;
    private 
$queue_pointer 0;
    private 
$queue_curr 0;
    protected function 
queueInit(){$this->queue_len 0$this->queue_pointer 0$this->queue_curr 0;}
    protected function 
getNextQueued(){
        if (
$this->queue_len == 0) return null;
        
$p $this->queue_curr;
        
$x 0;
        
$ret = [];
        do{
            
$key "queue_func_{$p}";
            if (
$this->$key){
                
$ret['func'] = $this->$key;
                unset(
$this->$key);
                
$key "queue_params_{$p}";
                
$prs $this->$key;
                unset(
$this->$key);
                
$ret['params'] = [];
                for (
$i 0$i $prs$i++){
                    
$key "queue_param_{$p}_{$i}";
                    
$ret['params'][] =  $this->$key;
                    unset(
$this->$key);
                }
                
$this->queue_len--;
                
$this->queue_curr++;
                return 
$ret;
            }
            else 
$p++;
            if (
$x++ > 1000){   // the queue is corrupt
                
echo "\n\t\t task queue is corrupt for ".get_called_class()."\n";
                
$this->queue_len 0;
                
$this->queue_pointer 0;
                return 
null;
            }
        }while(isset(
$this->$key));
        return 
null;

    }
    protected function 
addToQueue($func,$params = []){
        if (!
is_array($params) || !@method_exists($this,$func)) return false;
        
$p $this->queue_pointer;
        
$key "queue_func_{$p}";
        
$this->$key $func;
        
$key "queue_params_{$p}";
        
$this->$key count($params);
        
$i 0;
        foreach (
$params as $pa){
            
$key "queue_param_{$p}_{$i}";
            
$this->$key $pa;
            
$i++;
        }
        
$this->queue_pointer++;
        
$this->queue_len++;
        return 
true;
    }
}

class 
extends Thread{
    use 
TaskQueue;
    private 
$sig_term;
    public function 
__construct(){
        
$this->cond Cond::create();
        
$this->sig_term false;
        
$this->queueInit(); // threads don't like default parameters
        
$this->start();
    }

    public function 
run(){
        while(!
$this->sig_term){
            while((
$qi $this->getNextQueued())){
                
call_user_func_array([$this,$qi['func']],$qi['params']);
            }
            echo 
"now doing MAIN -- ";
            
usleep(1*T);
            echo 
"finished MAIN\n";
        }
        
Mutex::destroy($m);
    }

    private function 
reallyDoA($a){
        echo 
"doing A: {$a} -- ";
        
usleep(0.7*T);
        echo 
"finished A\n";

    }
    public function 
doA($param1){
        
$this->addToQueue('reallyDoA',[$param1]);
    }
    private function 
reallyDoB($a,$b){
        echo 
"doing B: {$a},{$b} -- ";
        
usleep(0.1*T);
        echo 
"finished B\n";

    }
    public function 
doB($param1$param2){
        
$this->addToQueue('reallyDoB',[$param1,$param2]);
    }
    public function 
term(){$this->sig_term true;}
}

$a = new A();
usleep(1.2*T);
$a->doA(2);
$a->doA(3);
$a->doB("now b","something");
usleep(5.4*T);
$a->doA(5);
$a->doA(7);

usleep(3*T);
$a->term();
$a->join();
?>

very useful if you need syncronization from an outside event (ie: async socket read/write - your run() takes care of reading with timeout / signalig on_packet events, but writing may occur at any time (ie async heartbeeting) and fwrite($sock) while stream_select($sock) is waiting for timeout causes segfaults...)

上一篇: 下一篇: