Как запустить шаг окна PSCP cmd в моем скрипте Python

Я запускаю Hadoop MapReduce и другие команды SSH из скрипта Python, используя модуль paramiko (код можно увидеть здесь). Когда задание MapReduce завершено, я запускаю шаг getmerge, чтобы получить результат в виде текстового файла.

Проблема в том, что затем мне нужно открыть окно cmd и запустить PSCP, чтобы загрузить файл output.txt из среды HDFS на мой компьютер. Например:

pscp [email protected]:/nfs_home/appers/cnielsen/MROutput_121815_0.txt C:\Users\cnielsen\Desktop\MR_Test

Как я могу включить этот шаг pscp в свой сценарий, чтобы мне не приходилось открывать окно cmd для его запуска после завершения моих задач MapReduce и getmerge? Я хотел бы, чтобы мой сценарий мог запускать задачу MR, задачу getmerge, а затем автоматически сохранять вывод MR на мой компьютер.

Вот мой код.


person Chris Nielsen    schedule 13.01.2016    source источник


Ответы (1)


Я решил эту проблему с помощью следующего кода. Хитрость заключалась в том, чтобы использовать модуль scp и импортировать SCPClient. См. ниже функцию scp_download(ssh).

Когда задание MapReduce завершается, запускается команда getmerge, за которой следует шаг scp_download.

import paramiko
from scp import SCPClient
import time

# Define connection info
host_ip = 'xx.xx.xx.xx'
user = 'xxxxxxxx'
pw = 'xxxxxxxx'
port = 22

# Paths
input_loc = '/nfs_home/appers/extracts/*/*.xml'
output_loc = '/user/lcmsprod/output/cnielsen/'
python_path = "/usr/lib/python_2.7.3/bin/python"
hdfs_home = '/nfs_home/appers/cnielsen/'
output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt'

# File names
xml_lookup_file = 'product_lookups.xml'
mapper = 'Mapper.py'
reducer = 'Reducer.py'
helper_script = 'Process.py'
product_name = 'test1'
output_ref = 'test65'
target_file = 'test_011416_3.txt'

# ----------------------------------------------------
def createSSHClient(host_ip, port, user, pw):
    client = paramiko.SSHClient()
    client.load_system_host_keys()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(host_ip, port, user, pw)
    return client
# ----------------------------------------------------
def buildMRcommand(product_name):
    space = " "
    mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar',
                        '-files', hdfs_home+xml_lookup_file,
                        '-file', hdfs_home+mapper,
                        '-file', hdfs_home+reducer,
                        '-mapper', "'"+python_path, mapper, product_name+"'",
                        '-file', hdfs_home+helper_script,
                        '-reducer', "'"+python_path, reducer+"'",
                        '-input', input_loc,
                        '-output', output_loc+output_ref]

    MR_command = space.join(mr_command_list)
    print MR_command
    return MR_command
# ----------------------------------------------------
def unbuffered_lines(f):
    line_buf = ""
    while not f.channel.exit_status_ready():
        line_buf += f.read(1)
        if line_buf.endswith('\n'):
            yield line_buf
            line_buf = ""
# ----------------------------------------------------
def stream_output(stdin, stdout, stderr):
    writer = open(output_log, 'w')
    # Using line_buffer function
    for l in unbuffered_lines(stderr):
        e = '[stderr] ' + l
        print '[stderr] ' + l.strip('\n')
        writer.write(e)

    # gives full listing..
    for line in stdout:
        r = '[stdout]' + line
        print '[stdout]' + line.strip('\n')
        writer.write(r)
    writer.close()
# ----------------------------------------------------
def run_MapReduce(ssh):
    stdin, stdout, stderr = ssh.exec_command(buildMRcommand(product_name))
    stream_output(stdin, stdout, stderr)
    return 1
# ----------------------------------------------------
def run_list_dir(ssh):
    list_dir = "ls "+hdfs_home+" -l"
    stdin, stdout, stderr = ssh.exec_command(list_dir)
    stream_output(stdin, stdout, stderr)
# ----------------------------------------------------
def run_getmerge(ssh):
    getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+target_file
    print getmerge
    stdin, stdout, stderr = ssh.exec_command(getmerge)
    for line in stdout:
        print '[stdout]' + line.strip('\n')
    time.sleep(1.5)
    return 1
# ----------------------------------------------------
def scp_download(ssh):
    scp = SCPClient(ssh.get_transport())
    print "Fetching SCP data.."
    scp.get(hdfs_home+target_file, local_dir)
    print "File download complete."
# ----------------------------------------------------
def main():
    # Get the ssh connection
    global ssh
    ssh = createSSHClient(host_ip, port, user, pw)
    print "Executing command..."

    # Command list
    ##run_list_dir(ssh)
    ##run_getmerge(ssh)
    ##scp_download(ssh)

    # Run MapReduce
    MR_status = 0
    MR_status = run_MapReduce(ssh)

    if MR_status == 1:
        gs = 0
        gs = run_getmerge(ssh)
        if gs == 1:
            scp_download(ssh)

    # Close ssh connection
    ssh.close()

if __name__ == '__main__':
    main()
person Chris Nielsen    schedule 14.01.2016
comment
Примечание. Похоже, что команда getmerge не предоставляет код выхода, поэтому я использовал в сценарии ожидание time.sleep(1.5), чтобы шаг scp_download не начинался до завершения задания getmerge. - person Chris Nielsen; 14.01.2016