文字

Pool 类

(PECL pthreads >= 2.0.0)

简介

Pool 对象是多个 Worker 对象的容器,同时也是它们的控制器。

线程池是对 Worker 功能的高层抽象,包括按照 pthreads 需要的方式来管理应用的功能。

类摘要

Pool {
protected $size ;
protected $class ;
protected $workers ;
protected $work ;
protected $ctor ;
protected $last ;
public void collect ( Callable $collector )
public Pool __construct ( integer $size , string $class [, array $ctor ] )
public void resize ( integer $size )
public void shutdown ( void )
public integer submit ( Threaded $task )
public integer submitTo ( integer $worker , Threaded $task )
}

属性

size

Pool 对象可容纳的 Worker 对象的最大数量

class

Worker 的类

ctor

构造新的 Worker 对象时所需的参数

workers

指向 Worker 对象的引用

work

指向提交到 Pool 对象中的 Threaded 对象的引用

last

最后使用的 Worker 对象在池中的位置偏移量

Table of Contents

  • Pool::collect — 回收已完成任务的引用
  • Pool::__construct — 创建新的 Worker 对象池
  • Pool::resize — 改变 Pool 对象的可容纳 Worker 对象的数量
  • Pool::shutdown — 停止所有的 Worker 对象
  • Pool::submit — 提交对象以执行
  • Pool::submitTo — 提交对象以执行

用户评论:

[#1] olavk [2015-03-30 13:48:36]

Simple example with Collectable (basically Thread meant for Pool) and Pool

<?php

class job extends Collectable {
  public 
$val;

  public function 
__construct($val){
    
// init some properties
    
$this->val $val;
  }
  public function 
run(){
    
// do some work
    
$this->val $this->val file_get_contents('http://www.example.com/'nullnull320);
    
$this->setGarbage();
  }
}

// At most 3 threads will work at once
$p = new Pool(3);

$tasks = array(
  new 
job('0'),
  new 
job('1'),
  new 
job('2'),
  new 
job('3'),
  new 
job('4'),
  new 
job('5'),
  new 
job('6'),
  new 
job('7'),
  new 
job('8'),
  new 
job('9'),
  new 
job('10'),
);
// Add tasks to pool queue
foreach ($tasks as $task) {
  
$p->submit($task);
}

// shutdown will wait for current queue to be completed
$p->shutdown();
// garbage collection check / read results
$p->collect(function($checkingTask){
  echo 
$checkingTask->val;
  return 
$checkingTask->isGarbage();
});

?>

[#2] fajan [2014-05-28 13:18:57]

Example class to demonstrate usage of Pool/Worker mechanism, also to show a few tricks & hints ;)
<?php
class Config extends Threaded{    // shared global object
    
protected $val=0$val2=0;
    protected function 
inc(){++$this->val;}    // protected synchronizes by-object
    
public function inc2(){++$this->val2;}    // no synchronization
}
class 
WorkerClass extends Worker{
    protected static 
$worker_id_next = -1;
    protected 
$worker_id;
    protected 
$config;
    public function 
__construct($config){
        
$this->worker_id = ++static::$worker_id_next;    // static members are not avalable in thread but are in 'main thread'
        
$this->config $config;
    }
    public function 
run(){
        global 
$config;
        
$config $this->config;    // NOTE: setting by reference WON'T work
        
global $worker_id;
        
$worker_id $this->worker_id;
        echo 
"working context {$worker_id} is created!\n";
        
//$this->say_config();    // globally synchronized function.
    
}
    protected function 
say_config(){    // 'protected' is synchronized by-object so WON'T work between multiple instances
        
global $config;        // you can use the shared $config object as synchronization source.
        
$config->synchronized(function() use (&$config){    // NOTE: you can use Closures here, but if you attach a Closure to a Threaded object it will be destroyed as can't be serialized
            
var_dump($config);
        });
    }
}
class 
Task extends Stackable{    // Stackable still exists, it's just somehow dissappeared from docs (probably by mistake). See older version's docs for more details.
    
protected $set;
    public function 
__construct($set){
        
$this->set $set;
    }
    public function 
run(){
        global 
$worker_id;
        echo 
"task is running in {$worker_id}!\n";
        
usleep(mt_rand(1,100)*100);
        
$config $this->getConfig();
        
$val $config->arr->shift();
        
$config->arr[] = $this->set;
        for (
$i $i 1000; ++$i){
            
$config->inc();
            
$config->inc2();
        }
    }
    public function 
getConfig(){
        global 
$config;    // WorkerClass set this on thread's scope, can be reused by Tasks for additional asynch data source. (ie: connection pool or taskqueue to demultiplexer)
        
return $config;
    }
}

$config = new Config;
$config->arr = new \Threaded();
$config->arr->merge(array(1,2,3,4,5,6));
class 
PoolClass extends Pool{
    public function 
worker_list(){
        if (
$this->workers !== null)
            return 
array_keys($this->workers);
        return 
null;
    }
}
$pool = new PoolClass(3'WorkerClass', [$config] );
$pool->worker_list();
//$pool->submitTo(0,new Task(-10));    // submitTo DOES NOT try to create worker

$spammed_id = -1;
for (
$i 1$i <= 100; ++$i){    // add some jobs
    
if ($spammed_id == -&& ($x $pool->worker_list())!= null && @$x[2]){
        
$spammed_id $x[2];
        echo 
"spamming worker {$spammed_id} with lots of tasks from now on\n";
    }
    if (
$spammed_id != -&& ($i 5) == 0)    // every 5th job is routed to one worker, so it has 20% of the total jobs (with 3 workers it should do ~33%, not it has (33+20)%, so only delegate to worker if you plan to do balancing as well... )
        
$pool->submitTo($spammed_id,new Task(10*$i));    
    else
        
$pool->submit(new Task(10*$i));
}
$pool->shutdown();
var_dump($config); // "val" is exactly 100000, "val2" is probably a bit less
// also: if you disable the spammer, you'll that the order of the "arr" is random.
?>

上一篇: 下一篇: