这是我觉得很难理解的事情:
cl = makeCluster(rep("localhost", 8), "SOCK")
# This will not work, error: dat not found in the nodes
pmult = function(cl, a, x)
{
mult = function(s) s*x
parLapply(cl, a, mult)
}
scalars = 1:4
dat = rnorm(4)
pmult(cl, scalars, dat)
# This will work
pmult = function(cl, a, x)
{
x
mult = function(s) s*x
parLapply(cl, a, mult)
}
scalars = 1:4
dat = rnorm(4)
pmult(cl, scalars, dat)
# This will work
pmult = function(cl, a, x)
{
mult = function(s, x) s*x
parLapply(cl, a, mult, x)
}
scalars = 1:4
dat = rnorm(4)
pmult(cl, scalars, dat)
由于参数的懒惰评估,第一个函数不起作用。但什么是懒惰评估?执行mult()时,是否不需要对x求值?第二个有效,因为它强制 x 被评估。现在最奇怪的事情发生在第三个函数中,什么也没做,只是让 mult() 接收 x 作为一个额外的参数,突然一切都正常了!
还有一点是,如果我不想在调用parLapply()的函数内部定义所有的变量和函数怎么办?以下绝对行不通:
pmult = function(cl)
{
source("a_x_mult.r")
parLapply(cl, a, mult, x)
}
scalars = 1:4
dat = rnorm(4)
pmult(cl, scalars, dat)
我可以将所有这些变量和函数作为参数传递:
f1 = function(i)
{
return(rnorm(i))
}
f2 = function(y)
{
return(f1(y)^2)
}
f3 = function(v)
{
return(v- floor(v) + 100)
}
test = function(cl, f1, f2, f3)
{
x = f2(15)
parLapply(cl, x, f3)
}
test(cl, f1, f2, f3)
或者我可以使用clusterExport(),但是当有很多对象要导出时会很麻烦。有没有更好的办法?
最佳答案
要理解这一点,您必须意识到每个函数都有一个相关的环境,而该环境是什么取决于函数的创建方式。在脚本中简单创建的函数与全局环境相关联,但由另一个函数创建的函数与创建函数的本地环境相关联。在您的示例中, pmult
创建 mult
,因此与 mult
关联的环境包含形式参数 cl
、 a
和 x
。
第一种情况的问题是 parLapply
对 x
一无所知:它只是一个未经评估的形式参数,它被 mult
序列化为 parLapply
环境的一部分。由于 x
在 mult
被序列化并发送到集群 worker 时不会被评估,因此当 worker 执行 mult
时会导致错误,因为 dat
在该上下文中不可用。换句话说,当 mult
计算 x
时,为时已晚。
第二种情况有效,因为 x
在 mult
序列化之前被评估,所以 x
的实际值与 mult
的环境一起序列化。如果您知道闭包但不知道惰性参数评估,它会做您期望的事情。
第三种情况有效,因为您有 parLapply
为您处理 x
。根本没有任何诡计。
我应该警告你,在所有这些情况下, a
正在被评估(通过 parLapply
)并与 mult
的环境一起序列化。 parLapply
也是将 a
拆分成块,然后将这些块发送给每个 worker,因此 a
环境中的 mult
副本完全没有必要。它不会导致错误,但可能会损害性能,因为 mult
会发送给每个任务对象中的工作人员。幸运的是,这对 parLapply
来说不是问题,因为每个 worker 只有一个任务。 clusterApply
或 clusterApplyLB
的任务数量等于 a
的长度,这将是一个更糟糕的问题。
我在书的“雪”一章中谈到了许多与功能和环境相关的问题。涉及一些微妙的问题,很容易被烧毁,有时没有意识到它发生了。
至于你的第二个问题,有多种策略可以将函数导出到 worker,但有些人确实使用 source
来定义 worker 上的函数,而不是使用 clusterExport
。请记住,source
有一个 local
参数,用于控制解析表达式的计算位置,您可能需要指定脚本的绝对路径。最后,如果您使用的是远程集群 worker,如果您没有分布式文件系统,您可能需要将脚本 scp 到 worker。
这是将全局环境中的所有函数导出到集群工作器的简单方法:
ex <- Filter(function(x) is.function(get(x, .GlobalEnv)), ls(.GlobalEnv))
clusterExport(cl, ex)
关于r - 如何使一个对象可用于 R 并行计算的雪包中的节点,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/16861628/