任何机构都可以举例说明将erlang模块和erlang函数发送到

query.map()


在python riak客户端中,在文档中就像

function (string, list) – Either a named Javascript function (ie: ‘Riak.mapValues’), or an anonymous javascript function (ie: ‘function(...) ... ‘ or an array [‘erlang_module’, ‘function’].
options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.


但是没有明确的信息我必须发送。实际上我一直在给query.map()阶段作为

query.map(['maps','fun']) # maps is the maps.erl and fun is the function in the maps.erl file


如文档中所述,我已经在app.cofig下设置了梁文件路径,以保留已编译的梁文件。我做了所有这些事情,但是,运行命令后出现错误

 query.map(['maps','funs'])
 >>> query.run()
 Traceback (most recent call last):
 File "<input>", line 1, in <module>
 File "/usr/lib/python2.6/site-packages/riak-1.5.2-py2.6.egg/riak/mapreduce.py", line 234, in run
 result = t.mapred(self._inputs, query, timeout)
 File "/usr/lib/python2.6/site-packages/riak-1.5.2-py2.6.egg/riak/transports/http.py", line 322, in mapred
(repr(response[0]), repr(response[1])))
 Exception: Error running MapReduce operation. Headers: {'date': 'Mon, 26 May 2014 11:24:04 GMT', 'content-length': '1121', 'content-type': 'application/json'
, 'http_code': 500, 'server': 'MochiWeb/1.1 WebMachine/1.10.0 (never breaks eye contact)'} Body: '{"phase":0,"error":"undef","input":"{ok,{r_object,<<\\"tst\
\">>,<<\\"test5\\">>,[{r_content,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<\\"X-Riak-VTag\\
">>,50,53,75,69,55,80,113,109,65,69,117,106,109,109,99,65,72,101,75,82,115,86]],[[<<\\"index\\">>]],[],[[<<\\"X-Riak-Last-Modified\\">>|{1400,340359,663135}]
],[],[]}}},<<\\"6\\">>}],[{<<197,82,177,11,83,115,139,10>>,{1,63567559559}}],   {dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],
[],[],[],[],[],[],[],[],[],...}}},...},...}","type":"error","stack":"[{maps,funs,  [{r_object,<<\\"tst\\">>,<<\\"test5\\">>,[{r_content,{dict,3,16,16,8,80,48,{
[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[], [[<<\\"X-Riak-VTag\\">>,50,53,75,69,55,80,113,109,65,69,117,106,109,109,99,6
5,72,101,75,82,115,86]],[[<<\\"index\\">>]],[],[[<<\\"X-Riak-Last-Modified\\">>|{1400,340359,663135}]],[],[]}}},<<\\"6\\">>}],[{<<197,82,177,11,83,115,139,10
>>,{1,63567559559}}],{dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],...}}},...},...],...},...]"}'


我错过了什么错,请建议我

最佳答案

在Python客户端上使用Erlang映射函数分为3部分:


编写和编译Erlang模块
准备Riak集群
从Python客户端调用函数


Erlang模块应该非常简单,对于此示例,我将让map函数返回每个键的值(同级)数:

-module(custom_mr).

-export([mapcount/3]).

mapcount(Obj,_Keydata,_Arg) ->
  [length(riak_object:get_values(Obj))].


Erlang的版本以细微的方式变化,因此使用Riak捆绑的Erlang会更安全,如果从源代码构建,则使用与您编译它相同的版本。生成的.beam文件将需要放置在Riak运行所在的用户可读的目录中-如果您使用软件包安装,则默认为riak。您将需要部署.beam文件,并在群集中的每个节点上修改app.config。

# /usr/lib/riak/erts-5.9.1/bin/erlc custom_mr.erl
# mkdir /var/lib/riak/custom_code
# mv custom_mr.beam /var/lib/riak/custom_code
# chown -R riak:riak /var/lib/riak/custom_code


然后编辑app.config并将{add_paths,["/var/lib/riak/custom_code"]}添加到riak_kv部分,然后重新启动节点。

riak attach进行测试以确保已加载新模块-在此示例中,节点1-4已加载该模块,但是node5处于关闭状态:

# riak attach
1> riak_core_util:rpc_every_member_ann(code,which,[custom_mr]).
{[{'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"},
  {'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"},
  {'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"},
  {'[email protected]',"/var/lib/riak/custom_code/custom_mr.beam"}],
 ['[email protected]']}
2> custom_mr:mapcount(riak_object:new(<<"test">>,<<"test">>,<<"test">>),keydata,arg).
[1]


(如果运行的是1.4之前的版本,请使用ctrl-d与riak控制台分离,否则运行ctrl-c a)

最后,Python代码(我使用文件名test.py):

import riak

client = riak.RiakClient()

test_bucket = client.bucket('test')

data1 = test_bucket.new('key1',data={'field1':'1data1','field2':'1data2','field3':1})
data1.store()

data2 = test_bucket.new('key2',data={'field1':'2data1','field2':'2data2','field3':2})
data2.store()

data3 = test_bucket.new('key3',data={'field1':'3data1','field2':'3data2','field3':3})
data3.store()

query = riak.RiakMapReduce(client).add('testbucket')
query.map(['custom_mr','mapcount'])

for result in query.run():
    print "%s" % (result)


运行此代码将为存储桶中的每个键返回1:

#python test.py
1
1
1


注意在放入新值之前我没有进行获取,因此,如果您的默认存储桶属性包括allow_mult:true,则再次运行此操作将为每个值创建一个同级,并且您将获得“ 2”而不是“ 1”


添加更多示例



新模块,如上编译安装

-module(custom_mr).

-export([mapcount/3,
         mapvalue/3,
         mapfield/3,
         mapfieldwithid/3,
         reducecount/2,
         reducepropsort/2,
         reducedoublesort/2,
         reducesort/2]).

mapcount(Obj,_Kd,_Arg) ->
  [length(riak_object:get_values(Obj))].

mapvalue(Obj,_Kd,_Arg) ->
  [hd(riak_object:get_values(Obj))].

mapfield(Obj,_Kd,Arg) ->
  Val = case catch mochijson2:decode(hd(riak_object:get_values(Obj))) of
    {struct, Data} ->
        case Arg =:= null of
          true -> Data;
          false -> [{Arg,proplists:get_value(Arg,Data)}]
        end;
    _ ->
        [{Arg,{error,notjson}}]
  end,
  [list_to_binary(mochijson2:encode(Val))].

mapfieldwithid(Obj,_Kd,Arg) ->
  Val = case catch mochijson2:decode(hd(riak_object:get_values(Obj))) of
    {struct, Data} ->
        case Arg =:= null of
          true -> Data;
          false -> [{Arg,proplists:get_value(Arg,Data)}]
        end;
    _ ->
        [{Arg,{error,notjson}}]
  end,
  V = [{bucket,riak_object:bucket(Obj)},{key,riak_object:key(Obj)}|Val],
  [list_to_binary(mochijson2:encode(V))].

reducecount(L,_Arg) ->
  [lists:sum([ N || N <- L, is_integer(N) ])].

sortfun(F) ->
  fun(A,B) ->
    proplists:get_value(F,A,<<"zzzz">>) =< proplists:get_value(F,B,<<"zzzz">>)
  end.

reducepropsort(L,Arg) ->
  Decoded = [ I || {struct,I} <- [ mochijson2:decode(E) || E <- L], is_list(I)],
  Sorted = lists:sort(sortfun(Arg), Decoded),
  [ list_to_binary(mochijson2:encode(I)) || I <- Sorted ].

reducesort(L,_Arg) ->
  lists:sort(L).

reducedoublesort(L,Arg) ->
  Decoded = [ lists:sort(I) || {struct,I} <- [ mochijson2:decode(E) || E <- L], is_list(I)],
  Sorted = lists:sort(sortfun(Arg), Decoded),
  [ list_to_binary(mochijson2:encode(I)) || I <- Sorted ].


Python代码

import riak

client = riak.RiakClient(pb_port=8087, host="172.31.0.1", protocol='pbc')

test_bucket = client.bucket('test_bucket')

data1 = test_bucket.new('key1',data={'field1':'1data1','field2':'1data2','field3':1, 'zone':'D'})
data1.store()

data2 = test_bucket.new('key2',data={'field1':'2data1','field2':'2data2','field3':2, 'zone':'A'})
data2.store()

data3 = test_bucket.new('key3',data={'field1':'3data1','field2':'3data2','field3':3, 'zone':'C'})
data3.store()


def printresult(q):
  for result in q.run():
    print "%s" % (result)

print "\nCount the number of values in the bucket"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapcount'])
query.reduce(['custom_mr','reducecount'])
printresult(query)

print "\nList all values in natual sort order"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapvalue'])
query.reduce(['custom_mr','reducesort'])
printresult(query)

print "\nList all values sorted by 'zone'"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'])
query.reduce(['custom_mr','reducepropsort'],{'arg':'zone'})
printresult(query)

print "\nList all values sorted by 'zone', also sort the fields in each object"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'])
query.reduce(['custom_mr','reducedoublesort'],{'arg':'zone'})
printresult(query)

print "\nList just field3, sorted"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'],{'arg':'field3'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'field3'})
printresult(query)

print "\nList just bucket,key,field3, sorted by field3"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfieldwithid'],{'arg':'field3'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'field3'})
printresult(query)

print "\nReturn just the zone for key2"
query = riak.RiakMapReduce(client).add('test_bucket','key2')
query.map(['custom_mr','mapfield'],{'arg':'zone'})
printresult(query)

print "\nReturn the bucket,key,zone for key1 and key3"
query = riak.RiakMapReduce(client).add('test_bucket',['key1','key3'])
query.map(['custom_mr','mapfieldwithid'],{'arg':'zone'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'zone'})
printresult(query)


请注意:这些示例中的许多示例都使用全桶MapReduce,这将非常繁重,并且如果用于非平凡的数据量,则可能会影响性能。最后两个示例显示了如何选择特定的键或键列表作为输入。如果使用集群创建了二级索引或Riak Search,则也可以将其用作输入,请参见文档中的riak-python-client query inputs

并输出:

# python ~/test.py

Count the number of values in the bucket
3

List all values in natual sort order
{"field2": "1data2", "field3": 1, "field1": "1data1", "zone": "D"}
{"field2": "2data2", "field3": 2, "field1": "2data1", "zone": "A"}
{"field2": "3data2", "field3": 3, "field1": "3data1", "zone": "C"}

List all values sorted by 'zone'
{"field2":"2data2","field3":2,"field1":"2data1","zone":"A"}
{"field2":"3data2","field3":3,"field1":"3data1","zone":"C"}
{"field2":"1data2","field3":1,"field1":"1data1","zone":"D"}

List all values sorted by 'zone', also sort the fields in each object
{"field1":"2data1","field2":"2data2","field3":2,"zone":"A"}
{"field1":"3data1","field2":"3data2","field3":3,"zone":"C"}
{"field1":"1data1","field2":"1data2","field3":1,"zone":"D"}

List just field3, sorted
{"field3":1}
{"field3":2}
{"field3":3}

List just bucket,key,field3, sorted by field3
{"bucket":"test_bucket","key":"key1","field3":1}
{"bucket":"test_bucket","key":"key2","field3":2}
{"bucket":"test_bucket","key":"key3","field3":3}

Return just the zone for key2
{"zone":"A"}

Return the bucket,key,zone for key1 and key3
{"bucket":"test_bucket","key":"key3","zone":"C"}
{"bucket":"test_bucket","key":"key1","zone":"D"}

关于python - 在python-riak客户端l中将erlang模块和erlang函数发送到mapreduce面的确切方法是什么,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/23870173/

10-12 23:23