本文介绍了如何并行运行PHP对象方法并将结果同步到数组中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您好,我试图找到一种方法来并行运行PHP对象方法.

Hi trying to find a way how to run PHP object method in parallel.

已经研究了使用PHP进行多线程处理的几种解决方案,但是似乎找不到并行运行对象方法的方法,有人可以解释我做错了什么,并提出对任何解决方案或替代示例的修复建议与Country类,其中get_data方法将在多个并行进程中运行?

Have looked through few solutions on multi-threading with PHP however can't seem to find a way to run object methods in parallel, can someone explain what am i doing wrong and suggest a fix on any of the solutions or alternative example with Country class where get_data method would be running in multiple parallel processes?

  1. pcntl_fork()-使用PHP进行分叉
  2. Pthreads -PHP扩展
  3. misterion/ko-process -作曲家软件包
  4. duncan3dc/fork-helper -composer软件包
  5. 照亮/排队-作曲家软件包
  1. pcntl_fork() - Forking with PHP
  2. Pthreads - PHP extension
  3. misterion/ko-process - composer package
  4. duncan3dc/fork-helper - composer package
  5. illuminate/queue - composer package

测试pcntl_fork()

    <?php

    class Countries {
        function get_data($country){

            usleep(1000);
            foreach($i=0; $i<1000;$i++ ){
                $data[$i] = $country;
            }

            return $data;

        }
    }

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



    // how to add and control a limit of max processes running at the time?

    $start_time = microtime(true);

    foreach($countries as $country) {

        $pid = pcntl_fork();

        if (!$pid) {

            error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);

            // How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
            $d[$country] = $os->get_data($country);

            error_log( date('Y-m-d H:i:s').' - !pid -> d['.$country.']  ='.var_export($d[$country],true)." \n", 3, $log);
            exit($country);
        }
    }

    while (pcntl_waitpid(0, $status) != -1);
    // do something with $d[$country] here after all child processes completed


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."\n", 3, $log);





?>

测试Pthreads

<?php


if (extension_loaded('pthreads')) {

    $pool = new Pool(4);

    class Countries {
        function get_data($country){

            usleep(1000);
            foreach($i=0; $i<1000;$i++ ){
                $data[$i] = $country;
            }

            return $data;

        }
    }

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");


    $start_time = microtime(true);
    foreach($countries as $country) {   
        $dataN = new Threaded();
        $dataN->country = $country;
        $dataN->os = $os;
        $dataN->result = "";

        $threads[] = $dataN;

        $pool->submit(
            new class($dataN) extends Threaded {
                public $data;

                public function __construct($data)
                {
                    $this->data = $data;
                }

                public function run()
                {

                    $this->data->result = $this->data->os->get_data($this->data->country);

                }
            }
        );

    }


    while ($pool->collect());

    $pool->shutdown();

    foreach ($threads as $thread) {

        error_log( date('Y-m-d H:i:s').' - d['.$thread->country.'] = '.var_export($thread->result,true)."\n", 3, $log);
        $d[$thread->country] = $thread->result;

    }

    // do something with $d[$country] here after all child processes completed

    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 2. PHP PThreads example duration='.$duration."\n", 3, $log);
}else{
    error_log( date('Y-m-d H:i:s').' - pthreads extension is not loaded!'."\n", 3, $log);

}   

?>

测试雾化器/ko-process

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';

    class Countries {
        function get_data($country){

            usleep(1000);
            foreach($i=0; $i<1000;$i++ ){
                $data[$i] = $country;
            }

            return $data;

        }
    }

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



    // how to add and control a limit of max processes running at the time?

    $start_time = microtime(true);



    $manager = new Ko\ProcessManager();

    foreach($countries as $country) {

        $manager->fork(function(Ko\Process $p) {
            error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);
            // How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
            $d[$country] = $os->get_data($country);
        });

    }

    error_log( date('Y-m-d H:i:s')." - Waiting for the threads to finish... \n", 3, $log);  
    $manager->wait();

    error_log( date('Y-m-d H:i:s')." - threads finished. \n", 3, $log); 

    // do something with $d[$country] here after all child processes completed


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 3. misterion/ko-process example duration='.$duration."\n", 3, $log);



?>

测试duncan3dc/fork-helper

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';

    class Countries {
        function get_data($country){

            usleep(1000);
            foreach($i=0; $i<1000;$i++ ){
                $data[$i] = $country;
            }

            return $data;

        }
    }

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



    // how to add and control a limit of max processes running at the time?

    $start_time = microtime(true);


    $fork = new \duncan3dc\Forker\Fork;

    foreach($countries as $country) {

        $fork->call(function () {
            error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);
            // How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
            $d[$country] = $os->get_data($country);

        });


    }

    error_log( date('Y-m-d H:i:s')." - Waiting for the threads to finish... \n", 3, $log);  

    $fork->wait();
    error_log( date('Y-m-d H:i:s')." - threads finished. \n", 3, $log); 

    // do something with $d[$country] here after all child processes completed


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 3. duncan3dc/fork-helper example duration='.$duration."\n", 3, $log);





?>

测试照明/排队

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';


    class Countries {

        public $data;

        function __construct($country){
                $this->data[$country] = $this->get_data($country);
        }

        function get_data($country){

            usleep(1000);
            foreach($i=0; $i<1000;$i++ ){
                $data[$i] = $country;
            }

            return $data;

        }
    }

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");

    use Illuminate\Queue\Capsule\Manager as Queue;

    $queue = new Queue;

    $queue->addConnection([
        'driver' => 'beanstalkd',
        'host' => 'localhost',
        'queue' => 'default',
    ]);

    // Make this Capsule instance available globally via static methods... (optional)
    //$queue->setAsGlobal();


    // how to add and control a limit of max processes running at the same time?
    foreach($countries as $country) {
        $d[$country] = $queue->push('Countries', array("country"=>$country));
    }
    // how to get results after all processes completed into $d[$country]?
    // do something with results


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."\n", 3, $log);


?>              

推荐答案

我无法使用pthreads,ko进程,fork-helper或队列(我只是没有使用它们的经验),但这是一种使代码与pcntl_fork一起使用并使用套接字在子进程和父进程之间传递消息的方法:

I can't help with pthreads, ko-process, fork-helper, or queue (I simply don't have experience using them), but this is one method to get your code working with pcntl_fork and using sockets to pass messages between child and parent process:

<?php
    class Countries {
        function get_data($country){
            usleep(1000);
            for($i=0; $i<1000; $i++){
                $data[$i] = $country;
            }

            return $data;
        }
    }

    $os = new Countries;

    $countries = ["GB", "US", "FR", "DE", "IT", "ES", "LT", "BR", "BE", "JP", "CN"];

    // To answer your question about limiting the number of concurrent processes, you
    // need to limit the number of times you call pctnl_fork(). You might do something
    // like:
    //    1. Chunk the $countries array: [["GB", "US"], ["FR", "DE"], ["IT", "ES"], ...
    //    2. Call pctnl_fork() once for each inner array (half as many)
    //    3. Child process calls $os->get_data() once for each country in the sub-array
    //
    // Another solution is to utilize what's known as a "Pool" -- where you give a
    // collection of tasks to a class which spins up threads for you and hands tasks to
    // threads as they become available. This method abstracts the complexity of
    // multiprocessing, but will require you to find a third-party library you like or
    // implement the Pool class on your own.
    $start_time = microtime(true);

    // Initialize $d in the parent thread (before pcntl_fork())
    $d = [];

    // Keep a list of child processes, so that we can wait for ALL of them to terminate
    $pids = [];

    // Initialize a socket for message passing (see below)
    socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $socket);

    foreach($countries as $country) {
        $pid = pcntl_fork();

        if (!$pid) {
            error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);

            // To answer your question about how to collect the result in the $d array,
            // you need to pass the results back to the parent thread via some message
            // channel. The easiest solution I know of is a socket pair.
            //
            // In order for the socket to be available to both the parent and child,
            // the socket must be created before you fork (see above)
            $data = serialize($os->get_data($country));

            // Sockets are just raw byte streams with no organization or semantics. It's
            // up to you to understand the output of the socket. I devised a basic
            // protocol here where I begin with the country code, follow it up with a
            // serialized data structure, then terminate with a double-new-line
            socket_write($socket[0], $country . " " . $data . "\n\n");
            socket_close($socket[0]);
            exit();
        }

        $pids[] = $pid;
    }

    // Wait for all child processes to finish
    foreach($pids as $pid) {
        pcntl_waitpid($pid, $status);
    }

    // Keep reading from the socket until there's no data left
    $new_data = socket_read($socket[1], 1024);
    $data = $new_data;
    while(strlen($new_data) == 1024) {
        $new_data = socket_read($socket[1], 1024);
        $data .= $new_data;
    }

    // Split at double-new-line to get individual return values
    $results = explode("\n\n", $data);

    // Now parse the results (per my custom protocol I defined above)
    foreach($results as $result) {
        $country = substr($result, 0, 2);
        $result = substr($result, 3);
        $d[$country] = unserialize($result);
    }

    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration, 3);

    error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."\n", 3, $log);

?>

我要指出的一件事:很多时候,多处理并不能神奇地使程序以人们认为的那样更快地运行.如果一项任务是CPU密集型的(也就是说,您将所有时间都花在执行复杂的CPU操作上),那么多处理将没有效果或使其变慢.如果任务是IO绑定的(也就是说,您将所有时间都花在等待网络或磁盘操作完成上),则可以通过允许处理器执行有意义的工作而不是坐下来等待,来显着提高该任务的速度.

One thing I do want to note: Many times multiprocessing doesn't magically make programs run faster like people think it will. If a task is CPU-bound (that is, you're spending all of your time performing complex CPU operations) then multiprocessing will either have no effect or make it slower. If a task is IO-bound (that is, you spend all of your time waiting on network or disk operations to complete) then you can speed it up dramatically by allowing the processor to do meaningful work instead of sitting on its hands and waiting.

这篇关于如何并行运行PHP对象方法并将结果同步到数组中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-03 13:38