任何机构都可以举例说明将erlang模块和erlang函数发送到
query.map()
在python riak客户端中,在文档中就像
function (string, list) – Either a named Javascript function (ie: ‘Riak.mapValues’), or an anonymous javascript function (ie: ‘function(...) ... ‘ or an array [‘erlang_module’, ‘function’].
options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.
但是没有明确的信息我必须发送。实际上我一直在给query.map()阶段作为
query.map(['maps','fun']) # maps is the maps.erl and fun is the function in the maps.erl file
如文档中所述,我已经在app.cofig下设置了梁文件路径,以保留已编译的梁文件。我做了所有这些事情,但是,运行命令后出现错误
query.map(['maps','funs'])
>>> query.run()
Traceback (most recent call last):
File "<input>", line 1, in <module>
File "/usr/lib/python2.6/site-packages/riak-1.5.2-py2.6.egg/riak/mapreduce.py", line 234, in run
result = t.mapred(self._inputs, query, timeout)
File "/usr/lib/python2.6/site-packages/riak-1.5.2-py2.6.egg/riak/transports/http.py", line 322, in mapred
(repr(response[0]), repr(response[1])))
Exception: Error running MapReduce operation. Headers: {'date': 'Mon, 26 May 2014 11:24:04 GMT', 'content-length': '1121', 'content-type': 'application/json'
, 'http_code': 500, 'server': 'MochiWeb/1.1 WebMachine/1.10.0 (never breaks eye contact)'} Body: '{"phase":0,"error":"undef","input":"{ok,{r_object,<<\\"tst\
\">>,<<\\"test5\\">>,[{r_content,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<\\"X-Riak-VTag\\
">>,50,53,75,69,55,80,113,109,65,69,117,106,109,109,99,65,72,101,75,82,115,86]],[[<<\\"index\\">>]],[],[[<<\\"X-Riak-Last-Modified\\">>|{1400,340359,663135}]
],[],[]}}},<<\\"6\\">>}],[{<<197,82,177,11,83,115,139,10>>,{1,63567559559}}], {dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],
[],[],[],[],[],[],[],[],[],...}}},...},...}","type":"error","stack":"[{maps,funs, [{r_object,<<\\"tst\\">>,<<\\"test5\\">>,[{r_content,{dict,3,16,16,8,80,48,{
[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[], [[<<\\"X-Riak-VTag\\">>,50,53,75,69,55,80,113,109,65,69,117,106,109,109,99,6
5,72,101,75,82,115,86]],[[<<\\"index\\">>]],[],[[<<\\"X-Riak-Last-Modified\\">>|{1400,340359,663135}]],[],[]}}},<<\\"6\\">>}],[{<<197,82,177,11,83,115,139,10
>>,{1,63567559559}}],{dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],...}}},...},...],...},...]"}'
我错过了什么错,请建议我
最佳答案
在Python客户端上使用Erlang映射函数分为3部分:
编写和编译Erlang模块
准备Riak集群
从Python客户端调用函数
Erlang模块应该非常简单,对于此示例,我将让map函数返回每个键的值(同级)数:
-module(custom_mr).
-export([mapcount/3]).
mapcount(Obj,_Keydata,_Arg) ->
[length(riak_object:get_values(Obj))].
Erlang的版本以细微的方式变化,因此使用Riak捆绑的Erlang会更安全,如果从源代码构建,则使用与您编译它相同的版本。生成的.beam文件将需要放置在Riak运行所在的用户可读的目录中-如果您使用软件包安装,则默认为riak。您将需要部署.beam文件,并在群集中的每个节点上修改app.config。
# /usr/lib/riak/erts-5.9.1/bin/erlc custom_mr.erl
# mkdir /var/lib/riak/custom_code
# mv custom_mr.beam /var/lib/riak/custom_code
# chown -R riak:riak /var/lib/riak/custom_code
然后编辑app.config并将
{add_paths,["/var/lib/riak/custom_code"]}
添加到riak_kv
部分,然后重新启动节点。从
riak attach
进行测试以确保已加载新模块-在此示例中,节点1-4已加载该模块,但是node5处于关闭状态:# riak attach
1> riak_core_util:rpc_every_member_ann(code,which,[custom_mr]).
{[{'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"},
{'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"},
{'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"},
{'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"}],
['[email protected]']}
2> custom_mr:mapcount(riak_object:new(<<"test">>,<<"test">>,<<"test">>),keydata,arg).
[1]
(如果运行的是1.4之前的版本,请使用ctrl-d与riak控制台分离,否则运行ctrl-c a)
最后,Python代码(我使用文件名test.py):
import riak
client = riak.RiakClient()
test_bucket = client.bucket('test')
data1 = test_bucket.new('key1',data={'field1':'1data1','field2':'1data2','field3':1})
data1.store()
data2 = test_bucket.new('key2',data={'field1':'2data1','field2':'2data2','field3':2})
data2.store()
data3 = test_bucket.new('key3',data={'field1':'3data1','field2':'3data2','field3':3})
data3.store()
query = riak.RiakMapReduce(client).add('testbucket')
query.map(['custom_mr','mapcount'])
for result in query.run():
print "%s" % (result)
运行此代码将为存储桶中的每个键返回1:
#python test.py
1
1
1
注意在放入新值之前我没有进行获取,因此,如果您的默认存储桶属性包括
allow_mult:true
,则再次运行此操作将为每个值创建一个同级,并且您将获得“ 2”而不是“ 1”添加更多示例
新模块,如上编译安装
-module(custom_mr).
-export([mapcount/3,
mapvalue/3,
mapfield/3,
mapfieldwithid/3,
reducecount/2,
reducepropsort/2,
reducedoublesort/2,
reducesort/2]).
mapcount(Obj,_Kd,_Arg) ->
[length(riak_object:get_values(Obj))].
mapvalue(Obj,_Kd,_Arg) ->
[hd(riak_object:get_values(Obj))].
mapfield(Obj,_Kd,Arg) ->
Val = case catch mochijson2:decode(hd(riak_object:get_values(Obj))) of
{struct, Data} ->
case Arg =:= null of
true -> Data;
false -> [{Arg,proplists:get_value(Arg,Data)}]
end;
_ ->
[{Arg,{error,notjson}}]
end,
[list_to_binary(mochijson2:encode(Val))].
mapfieldwithid(Obj,_Kd,Arg) ->
Val = case catch mochijson2:decode(hd(riak_object:get_values(Obj))) of
{struct, Data} ->
case Arg =:= null of
true -> Data;
false -> [{Arg,proplists:get_value(Arg,Data)}]
end;
_ ->
[{Arg,{error,notjson}}]
end,
V = [{bucket,riak_object:bucket(Obj)},{key,riak_object:key(Obj)}|Val],
[list_to_binary(mochijson2:encode(V))].
reducecount(L,_Arg) ->
[lists:sum([ N || N <- L, is_integer(N) ])].
sortfun(F) ->
fun(A,B) ->
proplists:get_value(F,A,<<"zzzz">>) =< proplists:get_value(F,B,<<"zzzz">>)
end.
reducepropsort(L,Arg) ->
Decoded = [ I || {struct,I} <- [ mochijson2:decode(E) || E <- L], is_list(I)],
Sorted = lists:sort(sortfun(Arg), Decoded),
[ list_to_binary(mochijson2:encode(I)) || I <- Sorted ].
reducesort(L,_Arg) ->
lists:sort(L).
reducedoublesort(L,Arg) ->
Decoded = [ lists:sort(I) || {struct,I} <- [ mochijson2:decode(E) || E <- L], is_list(I)],
Sorted = lists:sort(sortfun(Arg), Decoded),
[ list_to_binary(mochijson2:encode(I)) || I <- Sorted ].
Python代码
import riak
client = riak.RiakClient(pb_port=8087, host="172.31.0.1", protocol='pbc')
test_bucket = client.bucket('test_bucket')
data1 = test_bucket.new('key1',data={'field1':'1data1','field2':'1data2','field3':1, 'zone':'D'})
data1.store()
data2 = test_bucket.new('key2',data={'field1':'2data1','field2':'2data2','field3':2, 'zone':'A'})
data2.store()
data3 = test_bucket.new('key3',data={'field1':'3data1','field2':'3data2','field3':3, 'zone':'C'})
data3.store()
def printresult(q):
for result in q.run():
print "%s" % (result)
print "\nCount the number of values in the bucket"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapcount'])
query.reduce(['custom_mr','reducecount'])
printresult(query)
print "\nList all values in natual sort order"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapvalue'])
query.reduce(['custom_mr','reducesort'])
printresult(query)
print "\nList all values sorted by 'zone'"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'])
query.reduce(['custom_mr','reducepropsort'],{'arg':'zone'})
printresult(query)
print "\nList all values sorted by 'zone', also sort the fields in each object"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'])
query.reduce(['custom_mr','reducedoublesort'],{'arg':'zone'})
printresult(query)
print "\nList just field3, sorted"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'],{'arg':'field3'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'field3'})
printresult(query)
print "\nList just bucket,key,field3, sorted by field3"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfieldwithid'],{'arg':'field3'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'field3'})
printresult(query)
print "\nReturn just the zone for key2"
query = riak.RiakMapReduce(client).add('test_bucket','key2')
query.map(['custom_mr','mapfield'],{'arg':'zone'})
printresult(query)
print "\nReturn the bucket,key,zone for key1 and key3"
query = riak.RiakMapReduce(client).add('test_bucket',['key1','key3'])
query.map(['custom_mr','mapfieldwithid'],{'arg':'zone'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'zone'})
printresult(query)
请注意:这些示例中的许多示例都使用全桶MapReduce,这将非常繁重,并且如果用于非平凡的数据量,则可能会影响性能。最后两个示例显示了如何选择特定的键或键列表作为输入。如果使用集群创建了二级索引或Riak Search,则也可以将其用作输入,请参见文档中的riak-python-client query inputs。
并输出:
# python ~/test.py
Count the number of values in the bucket
3
List all values in natual sort order
{"field2": "1data2", "field3": 1, "field1": "1data1", "zone": "D"}
{"field2": "2data2", "field3": 2, "field1": "2data1", "zone": "A"}
{"field2": "3data2", "field3": 3, "field1": "3data1", "zone": "C"}
List all values sorted by 'zone'
{"field2":"2data2","field3":2,"field1":"2data1","zone":"A"}
{"field2":"3data2","field3":3,"field1":"3data1","zone":"C"}
{"field2":"1data2","field3":1,"field1":"1data1","zone":"D"}
List all values sorted by 'zone', also sort the fields in each object
{"field1":"2data1","field2":"2data2","field3":2,"zone":"A"}
{"field1":"3data1","field2":"3data2","field3":3,"zone":"C"}
{"field1":"1data1","field2":"1data2","field3":1,"zone":"D"}
List just field3, sorted
{"field3":1}
{"field3":2}
{"field3":3}
List just bucket,key,field3, sorted by field3
{"bucket":"test_bucket","key":"key1","field3":1}
{"bucket":"test_bucket","key":"key2","field3":2}
{"bucket":"test_bucket","key":"key3","field3":3}
Return just the zone for key2
{"zone":"A"}
Return the bucket,key,zone for key1 and key3
{"bucket":"test_bucket","key":"key3","zone":"C"}
{"bucket":"test_bucket","key":"key1","zone":"D"}
关于python - 在python-riak客户端l中将erlang模块和erlang函数发送到mapreduce面的确切方法是什么,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/23870173/