问题描述
我想通过使用带有MrJob和Hadoop 2.7.1的映射器来填充Postgresql数据库。我目前使用以下代码:
I would like to populate a database of Postgresql by using a mapper with MrJob and Hadoop 2.7.1. I currently using the following code:
# -*- coding: utf-8 -*-
#Script for storing the sparse data into a database by using Hadoop
import psycopg2
import re
from mrjob.job import MRJob
args_d = False
args_c = True
args_s = True
args_n = 'es_word_space'
def unicodize(segment):
if re.match(r'\\u[0-9a-f]{4}', segment):
return segment.decode('unicode-escape')
return segment.decode('utf-8')
def create_tables(cr):
cr.execute("create table word_list(id serial primary key, word character varying not null)")
cr.execute("""create table word_sparse(
id serial primary key,
word_id integer references word_list(id) not null,
pos integer not null,
val float not null)""")
def delete_tables(cr):
cr.execute("drop table word_sparse")
cr.execute("drop table word_list")
class MRwordStore(MRJob):
def mapper(self, _, line):
global cr
item = line.strip().split('\t')
replaced = u"".join((unicodize(seg) for seg in re.split(r'(\\u[0-9a-f]{4})', item[0])))
key = u''.join((c for c in replaced if c != '"'))
cr.execute("insert into word_list(word) values(%s) returning id", (key,))
word_id = cr.fetchone()[0]
#Parse the list, literal_eval is avoided because of memory issues
inside = False
number = ""
pos = 0
val = 0
for c in item[1]:
if c == '[':
inside = True
elif c.isdigit():
number += c
elif c == ',':
if inside:
pos = int(number)
number = ""
elif c == ']':
if inside:
val = int(number)
number = ""
cr.execute("insert into word_sparse(word_id, pos, val) values (%s, %s, %s)", (word_id, pos, val))
inside = False
if __name__ == "__main__":
"""
Stores words in the database.
The first time, run with the arguments -cs.
If the database has to be recreated, run again with the d argument (-dcs)
It also asumes the owner of the database is a user named semeval with password semeval
"""
global cr
conn = psycopg2.connect("dbname=%s user=semeval password=semeval" % args_n)
cr = conn.cursor()
if args_d:
delete_tables(cr)
if args_c:
create_tables(cr)
if args_s:
MRwordStore().run()
conn.commit()
conn.close()
我尝试过通过调用我的脚本,我得到以下输出:
I tried to use not reducer. By calling my script I have this output:
$ python db_store_hadoop.py -r hadoop /almac/ignacio/data/wdSp_sparse.txt
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/db_store_hadoop.hduser.20160113.012419.718376
writing wrapper script to /tmp/db_store_hadoop.hduser.20160113.012419.718376/setup-wrapper.sh
Using Hadoop version 2.7.1
Copying local files into hdfs:///user/hduser/tmp/mrjob/db_store_hadoop.hduser.20160113.012419.718376/files/
PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols
并没有更多,似乎被绞死了。这是我的输入文件的示例:
and there is not more, it seems to be hanged. Here is a sample of my input file:
"\u00e1gil" [[1572, 1], [1590, 1], [4, 1], [774, 1]]
"\u00e1guila" [[10, 5], [1116, 2], [15, 1], [1590, 1], [1641, 2], [1704, 1], [1740, 3], [183, 1], [3, 1], [428, 2], [900, 3]]
"\u00e1guilas" [[1043, 1], [248, 1], [618, 1], [701, 2], [862, 2], [864, 2]]
"\u00e1lava" [[1572, 1], [1576, 2], [1590, 1], [726, 2]]
长度为1.5gB。我已经创建了数据库,它是空的。非常感谢您的帮助,因为我认为可能存在很多误解。
which is 1.5gB length. I already created the database and it is empty. Thank you very much for your help because I think probably there are many misconceptions.
推荐答案
每个映射器都需要自己的数据库连接。在 mapper_init()
中创建数据库连接,并在 mapper_final()
中将其关闭。您需要与mrjob脚本分开创建数据库。您应该首先尝试一些非常简单的mrjob脚本。您尚未以正确的方式启动它。浏览文档中的示例。
Each mapper needs its own database connection. Create the database connection in mapper_init()
and close it in mapper_final()
. You need to create the database separately from the mrjob script. You should try some very simple mrjob scripts first. You haven't started it the correct way. Work through the examples in the documentation.
这篇关于如何用Mrjob和Hadoop填充Postgresql数据库的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!