本文介绍了PHP pthreads-共享对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种安全快捷的方式来使用共享库.

Im searching a safe and fast way to use a shared object.

我已经在这里问了一个问题: https://github.com/krakjoe/pthreads/issues /470 但显然这不是正确的地方.

I asked the question already here: https://github.com/krakjoe/pthreads/issues/470but obviuously this wasnt the right place.

尝试与许多其他上下文(线程)共享对象(线程).所有线程都在更新此shard对象-它们可以设置自己的请求,并且还必须响应来自其他对象的请求.

Trying to share an object (Threaded) with many other contextes (Thread).All threads are updating this shard object -- they can set own requests and have to respond to requests from others also.

现在,克拉科夫回应说我在遇到问题时无法进行锁定/解锁.

Now that krakjoe responded that lock/unlock wont be available in 7 i got a problem.

我了解:.synchronized,但不知道如何使用它来使其满足我的需求.

I know about :.synchronized but have no idea how to use it to get it working for my needs.

我如何使用:: synchronized编写类似的方法

How can i use ::synchronized to write methods like

  • lock()
  • unlock()
  • is_locked()-检查是否已锁定,如果不锁定,请继续尝试,稍后再试

我写了一个(imo)非常简单的测试脚本.

i wrote a (imo) very easy test script.

此脚本包括 syc/lock/...方法atm.

this script includes no syc/lock/... methods atm.

它应该只显示我正在尝试做的事情.

it should just show what im trying to do.

我仍在寻找使用::的方式来确保此共享的安全性.

im still searching a way to use :: to make this shared safe.

代码:

<?php
/*
TEST:
    create n threads
    each will
        - Shared::set() its own ref
        - check if Shared::exists() its own ref
        - Shared::get() its ref back
        - call method ::isRunning() at returned val to easily check if is ref or got overwritten by another context

TODO:
    using ::synchronized to handle multi-context-access

NOTES:
    every method as public to prevent pthreads v2 "Method Modifiers - Special Behaviour"
        see: "Method Modifiers - Special Behaviour"
            at http://blog.krakjoe.ninja/2015/08/a-letter-from-future.html
*/
class Shared extends Threaded
{
    public $data;
    public function exists($ident)
    {
        return isset($this->data[$ident]);
    }
    public function set($ident, $ref)
    {
        $return = false;
        if(!isset($this->data[$ident])){
            $data = $this->data;
            $data[$ident] = $ref;
            $this->data = $data;
            $return = $this->data[$ident];
        }
        #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
        return $return;
    }
    public function get($ident)
    {
        $return = false;
        if($this->exists($ident) === true){
            $data = $this->data;
            $return = $data[$ident];
            unset($data[$ident]);
            $this->data = $data;
        }
        #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
        return $return;
    }
}

class T extends Thread
{
    public $count;
    public function __construct(Shared $Shared, $ident)
    {
        $this->Shared = $Shared;
        $this->ident = $ident;
    }
    public function run()
    {
        $slowdown = true;
        $this->count = 0;
        while(true){
            if($slowdown){
                // "don't allow usleep or sleep" : https://github.com/krakjoe/pthreads/commit/a157b34057b0f584b4db326f30961b5c760dead8
                //  loop a bit to simulate work:
                $start = microtime(true);
                $until = rand(1, 100000)/1000000;
                while(microtime(true)-$start < $until){
                    // ...
                }
            }

            if($this->Shared->exists($this->ident) === true){
                $ref = $this->Shared->get($this->ident);
            }
            else{
                $ref = $this->Shared->set($this->ident, $this);
            }
            // calling a method on $ref -- if not a ref we crash
            $ref->isRunning();
            unset($ref);
            $this->count++;
        }
    }
}


echo 'start ...' . PHP_EOL;

$n = 8;
$Shared = new Shared();
for($i = 0, $refs = array(); $i < $n; $i++){
    $refs[$i] = new T($Shared, $i);
    $refs[$i]->start();
}

while(!empty($refs)){
    // print status:
    if(!isset($t)or microtime(true)-$t > 1){
        $t = microtime(true);
        echo 'status: ' . count($refs) . ' running atm ...' . PHP_EOL;
    }

    // join crashed threads:
    foreach($refs as $i => $thread){
        if($thread->isRunning() === false){
            echo 'T-' . $i . ' stopped after ' . $thread->count . PHP_EOL;
            if($thread->isJoined() === false){
                $thread->join();
            }
            unset($refs[$i]);
        }
    }
}

echo 'no thread running anymore.' . PHP_EOL;

/* output
start ...
status: 8 running atm ...

Notice: Undefined offset: 6 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-6 stopped after 10
status: 7 running atm ...

Notice: Undefined offset: 4 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-4 stopped after 35
status: 6 running atm ...

Notice: Undefined offset: 7 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-7 stopped after 43
status: 5 running atm ...
status: 5 running atm ...
status: 5 running atm ...

[...]
*/
?>

推荐答案

Threaded对象已经是线程安全的,也就是说,无论何时您读取,写入,检查是否存在或删除(未设置)如果是成员,则该操作是原子操作-在发生第一操作时,没有其他上下文可以执行任何上述操作.对于用户不知道的引擎处理程序,情况也是如此,直到最低级别的所有内容都是隐式安全的.

Threaded objects are already thread safe, that is to say that, any time you read, write, check for the existence of, or delete (unset) a member, the operation is atomic - no other context can perform any of the aforementioned operations while the first operation takes place. The same is true for engine handlers that the user is unaware of, everything down to the lowest level is implicitly safe.

但是,这确实让人放心...当逻辑变得更加复杂时,这有明显的局限性,例如,在设置成员之前检查成员的存在或进行其他操作(如您所做的那样):对象是原子的,没有什么可以阻止另一个上下文unset在对isset的调用与对属性/维度的调用之间的成员.

Quite re-assuring, however ... This has obvious limits when the logic gets more complex, such as checking the existence of a member before setting or doing something else with it, as you are doing: While the operations on the object are atomic, there is nothing to stop another context unseting a member between the call to isset and the call to read the property/dimension.

这适用于PHP7(pthreads v3 +)

This applies to PHP7 (pthreads v3+)

安全和完整性在这里是两件不同的事情.当完整性很重要时,您可以在PHP7中使用Threaded::synchronized正确保留它.在PHP5中,您也可以保留它,但是代码和解释会更加复杂.

Safety, and integrity are two different things here. When integrity is important you can use Threaded::synchronized in PHP7 to preserve it correctly. In PHP5 you could preserve it also, but the code would be more complicated, as would the explanation.

如果我理解这是逻辑的话,您的第二个示例应无限期运行.因此,我将使用该假设来构建正确的代码,然后对这个无限循环中您可能想要做的事情做进一步的假设,并在需要的地方提供一些见识.

Your second example should run indefinitely, if I understand it's logic. So I'm using that assumption to construct the correct code, I'm going to make further assumptions about what you might want to do in this endless loop and provide some insight where it seems required.

<?php
class Referee extends Threaded {

    public function find(string $ident, Threaded $reference) {
        return $this->synchronized(function () use($ident, $reference) {
            if (isset($this[$ident])) {
                return $this[$ident];
            } else return ($this[$ident] = $reference);
        });
    }

    public function foreach(Closure $closure) {
        $this->synchronized(function() use($closure) {
            foreach ($this as $ident => $reference) {
                $closure($ident, $reference);
            }
        });
    }
}

class Test extends Thread {

    public function __construct(Referee $referee, string $ident, bool $delay) {
        $this->referee = $referee;
        $this->ident   = $ident;
        $this->delay   = $delay;
    }

    public function run() {
        while (1) {
            if ($this->delay) {
                $this->synchronized(function(){
                    $this->wait(1000000);
                });
            }

            $reference =
                $this->referee->find($this->ident, $this);

            /* do something with reference here, I guess */

            /* do something with all references here */
            $this->referee->foreach(function($ident, $reference){
                var_dump(Thread::getCurrentThreadId(),
                        $reference->getIdent(),
                        $reference->isRunning());
            });
        }
    }

    public function getIdent() {
        return $this->ident;
    }

    private $referee;
    private $ident;
    private $delay;
}

$referee = new Referee();
$threads = [];
$thread = 0;
$idents = [
    "smelly",
    "dopey",
    "bashful",
    "grumpy",
    "sneezy",
    "sleepy",
    "happy",
    "naughty"
];

while ($thread < 8) {
    $threads[$thread] = new Test($referee, $idents[$thread], rand(0, 1));
    $threads[$thread]->start();
    $thread++;
}

foreach ($threads as $thread)
    $thread->join();
?>

因此,我们将研究这些差异,我将告诉您它们为什么如此,以及您将如何编写它们,您已经知道我们现在不是在谈论安全,而是在为您提供完整性正如所解释的,(非常出色的)假设是您写的任何东西都是安全的".

So we'll look at the differences, I'll tell you why they are as they are and how else you might write them, you already know that we're not talking about safety now, but integrity, you are afforded the (quite remarkable) assumption that anything you write is "safe", as explained.

第一个主要区别是:

if ($this->delay) {
    $this->synchronized(function(){
        $this->wait(1000000);
    });
}

这只是进行Thread等待的一种合适方法,您不必使用Thread本身进行同步,可以使用任何Threaded对象.适当地做事(如果不清楚的话)的好处是,使用::wait可以使sleep和usleep不会使线程处于接受状态.

This is simply a suitable way to make a Thread wait, you wouldn't have to use the Thread itself to synchronize, you could use any Threaded object. The benefit of doing things properly, in case not clear is that, sleep and usleep do not leave threads in a receptive state, using ::wait does.

在现实世界中,您实际上只应该等待 东西,那将是一个更复杂的块,它可能(并且应该)看起来像:

In the real world, where you really should only ever wait for something, that would be a more complex block, it might (and should) look more like:

if ($this->delay) {
    $this->synchronized(function(){
        while ($this->condition) {
            $this->wait(1000000);
        }
    });
}

注意:从技术上讲,等待超时是在等待某事,但是,除了达到超时之外,您可能会被唤醒,因此应该为此准备代码.

这样,另一个上下文能够通知Thread它应该优雅地停止等待和关闭,或者立即执行一些其他重要操作,只需通过同步,更改条件并通知Thread即可.

Such that, another context is able to notify the Thread that it should stop waiting and shutdown gracefully, or carry out some other important action immediately, simply by synchronizing, changing a condition and notifying the Thread.

对于可预测的代码,熟悉同步,等待和通知工作的方式非常重要.

For predictable code, it's extremely important to get comfortable with how synchronized, wait and notify work.

接下来,我们有逻辑来设置和/或获取引用:

Next we have our logic for setting and or getting the reference:

$reference =
    $this->referee->find($this->ident, $this);

哪个调用此:

public function find(string $ident, Threaded $reference) {
    return $this->synchronized(function () use($ident, $reference) {
        if (isset($this[$ident])) {
            return $this[$ident];
        } else return ($this[$ident] = $reference);
    });
}

这是一个不好称的名字,很难命名,但是您可以看到在进行这些分组操作时,同步可以保持完整性.稍作调整,也可以使用相同的方法来获取对另一个对象的引用.

This is badly named, naming things is hard, but you can see that integrity is preserved by synchronization while these grouped operations take place. The same method could also be used to fetch a reference to another object, with a bit of tweaking.

我想您会对那个特定的引用(当前总是$this)进行操作.我猜不到继续...

I guess you do something with that particular reference (which is always going to be $this currently). I can't guess what. Moving on ...

我已经假设您要对每个Threads做一些事情,并且您想在整个迭代进行时保持数据的完整性:

I've made the assumption that you'll want to do something with each of these Threads, and you want to preserve the integrity of the data while the entire iteration takes place:

$this->referee->foreach(function($ident, $reference){
    var_dump(Thread::getCurrentThreadId(),
            $reference->getIdent(),
            $reference->isRunning());
});

哪个调用:

public function foreach(Closure $closure) {
    $this->synchronized(function() use($closure) {
        foreach ($this as $ident => $reference) {
            $closure($ident, $reference);
        }
    });
}

这就是您将要执行的操作的方式.

This is how you would do such a thing.

值得一提的是,此处不一定需要同步;就像从迭代数组中删除一个成员一样,也不会发生任何不良情况;如果在迭代发生时对某个对象进行设置,设置或其他操作,也不会造成任何不良后果.

It's worthy of mention that synchronized is not necessarily required here; just as nothing bad will happen if you remove a member from an array you are iterating over, nothing bad will happen if you unset or set or do anything else to an object while iteration occurs.

这篇关于PHP pthreads-共享对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-01 18:24