HDFS位于远程服务器(hdfs_server)上。我可以执行ssh user@hdfs_server并分别使用catput进行读取和写入,但是我被要求不要触摸HDFS(除非要向其远程写入文件)。我需要使用本地计算机来读取和写入HDFS。

如何使用Python实现呢?我发现此代码似乎可以执行此操作,但是无法在本地计算机上运行它以在远程HDFS上进行读写:

import requests
import json
import os
import kerberos
import sys

node = os.getenv("namenode").split(",")
print (node)

local_file_path = sys.argv[1]
remote_file_path = sys.argv[2]
read_or_write = sys.argv[3]
print (local_file_path,remote_file_path)

def check_node_status(node):
        for name in node:
                print (name)
                request = requests.get("%s/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"%name,
                                                           verify=False).json()
                status = request["beans"][0]["State"]
                print (name)
                print (status)
                if status =="active":
                        break
        return status,name

def kerberos_auth():
        __, krb_context = kerberos.authGSSClientInit("HTTP@hdfs_server")
        kerberos.authGSSClientStep(krb_context, "")
        negotiate_details = kerberos.authGSSClientResponse(krb_context)
        headers = {"Authorization": "Negotiate " + negotiate_details,
                    "Content-Type":"application/binary"}
        return headers

def kerberos_hdfs_upload(status,name,headers):
        if status =="active":
                data=open('%s'%local_file_path, 'rb').read()
                write_req = requests.put("%s/webhdfs/v1%s?op=CREATE&overwrite=true"%(name,remote_file_path),
                                         headers=headers,
                                         verify=False,
                                         allow_redirects=True,
                                         data=data)
                print(write_req.text)

def kerberos_hdfs_read(status,name,headers):
        print(status)
        if status == "active":
                read = requests.get("%s/webhdfs/v1%s?op=OPEN"%(name,remote_file_path),
                                                        headers=headers,
                                    verify=False,
                                    allow_redirects=True)

                if read.status_code == 200:
                        data=open('%s'%local_file_path, 'wb')
                        data.write(read.content)
                        data.close()
                else :
                        print(read.content)


status, name= check_node_status(node)
headers = kerberos_auth()
if read_or_write == "write":
        kerberos_hdfs_upload(status,name,headers)
elif read_or_write == "read":
        print("fun")
        kerberos_hdfs_read(status,name,headers)

当我在本地计算机上运行它时,它显然在第7行给出了错误(node = os.getenv("namenode").split(",")),因为本地系统中没有namenode。那么,如何修改此代码以从HDFS进行读写?因为这实际上是我接触HDFS的第二天,所以我完全不知道,并且完全不知道如何进行这项工作。任何帮助是极大的赞赏。

编辑:完成export namenode=hdfs_server之后,namenode错误消失了。再次运行脚本后,出现新错误:
Traceback (most recent call last):
  File "read_write_hdfs.py", line 9, in <module>
    local_file_path = sys.argv[1]
IndexError: list index out of range
Error in sys.excepthook:
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/apport_python_hook.py", line 63, in apport_excepthook
    from apport.fileutils import likely_packaged, get_recent_crashes
  File "/usr/lib/python3/dist-packages/apport/__init__.py", line 5, in <module>
    from apport.report import Report
  File "/usr/lib/python3/dist-packages/apport/report.py", line 30, in <module>
    import apport.fileutils
  File "/usr/lib/python3/dist-packages/apport/fileutils.py", line 23, in <module>
    from apport.packaging_impl import impl as packaging
  File "/usr/lib/python3/dist-packages/apport/packaging_impl.py", line 23, in <module>
    import apt
  File "/usr/lib/python3/dist-packages/apt/__init__.py", line 23, in <module>
    import apt_pkg
ModuleNotFoundError: No module named 'apt_pkg'

Original exception was:
Traceback (most recent call last):
  File "read_write_hdfs.py", line 9, in <module>
    local_file_path = sys.argv[1]
IndexError: list index out of range

因此,我尝试使用3个参数(每个1个参数以满足sys.argv[1]sys.argv[2]sys.argv[3])运行脚本。我现在收到以下错误:
$ python3 read_write_hdfs.py /home/embs/Desktop/hdfs_test/ /home/edhuser/testdata.txt read
['hdfs_server']
/home/embs/Desktop/hdfs_test/ /home/edhuser/testdata.txt
/home/embs/Desktop/hdfs_test/ /home/edhuser/testdata.txt
hdfs_server
Traceback (most recent call last):
  File "read_write_hdfs.py", line 64, in <module>
    status, name= check_node_status(node)
  File "read_write_hdfs.py", line 22, in check_node_status
    verify=False).json()
  File "/usr/lib/python3/dist-packages/requests/api.py", line 67, in get
    return request('get', url, params=params, **kwargs)
  File "/usr/lib/python3/dist-packages/requests/api.py", line 53, in request
    return session.request(method=method, url=url, **kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 468, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 570, in send
    adapter = self.get_adapter(url=request.url)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 644, in get_adapter
    raise InvalidSchema("No connection adapters were found for '%s'" % url)
requests.exceptions.InvalidSchema: No connection adapters were found for 'hdfs_server/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus'
Error in sys.excepthook:
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/apport_python_hook.py", line 63, in apport_excepthook
    from apport.fileutils import likely_packaged, get_recent_crashes
  File "/usr/lib/python3/dist-packages/apport/__init__.py", line 5, in <module>
    from apport.report import Report
  File "/usr/lib/python3/dist-packages/apport/report.py", line 30, in <module>
    import apport.fileutils
  File "/usr/lib/python3/dist-packages/apport/fileutils.py", line 23, in <module>
    from apport.packaging_impl import impl as packaging
  File "/usr/lib/python3/dist-packages/apport/packaging_impl.py", line 23, in <module>
    import apt
  File "/usr/lib/python3/dist-packages/apt/__init__.py", line 23, in <module>
    import apt_pkg
ModuleNotFoundError: No module named 'apt_pkg'

Original exception was:
Traceback (most recent call last):
  File "read_write_hdfs.py", line 66, in <module>
    status, name= check_node_status(node)
  File "read_write_hdfs.py", line 24, in check_node_status
    verify=False).json()
  File "/usr/lib/python3/dist-packages/requests/api.py", line 67, in get
    return request('get', url, params=params, **kwargs)
  File "/usr/lib/python3/dist-packages/requests/api.py", line 53, in request
    return session.request(method=method, url=url, **kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 468, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 570, in send
    adapter = self.get_adapter(url=request.url)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 644, in get_adapter
    raise InvalidSchema("No connection adapters were found for '%s'" % url)
requests.exceptions.InvalidSchema: No connection adapters were found for 'hdfs_server/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus'

从跟踪来看,由于错误是由于函数check_node_status(node)引起的,所以我认为它可能无法连接到hdfs_server。我该如何解决?

最佳答案

os.getenv("namenode")正在寻找环境变量namenode

因此,导出env var并尝试再次运行脚本。

export namenode=hdfs_server

我假设hdfs_server不是实际的服务器名称。如果是您键入的实际命令,则不是主机名,而是ssh别名。您需要检查~/.ssh/config以获取实际的主机名。

关于python - 如何使用Python远程读取和写入HDFS?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48535980/

10-16 23:52