A Generic Paramiko Wrapper
This is a generic class that wraps Paramiko SSH functionality:
#!/usr/bin/env python # -*- coding: UTF-8 -*- import paramiko import logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) class SSHClient(object): def __init__(self, host, username, password): self.connection = self.connect(host, username, password) @staticmethod def connect(host, username, password): """ Initiates an SSH Connection to the Host """ ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect(host, username=username, password=password) logger.info("Connection Created (host = {}, username = {}, password = *****)".format(host, username)) except ValueError: logger.error("Connection Failed") return ssh def execute(self, command): """ Executes a command via SSH and returns results on StdOut """ logger.debug('Executing command on std out:\n\t{}'.format(command.strip())) return self.connection.exec_command(command.strip()) def close(self): self.connection.close()
Usage looks like this (assuming a dictionary of credential information):
ssh = SSHClient(creds["host"], creds["user"], creds["pass"]) # create the remote directory structure ssh_stdin, ssh_stdout, ssh_stderr = ssh.execute("the command here") logger.info("Results:\n\tstd-in = {}\n\tstd-out = {}\n\tstd-err = {}".format( ssh_stdin, ssh_stdout, ssh_stderr )) ssh.close()
Using Paramiko and SCP
The following snippet illustrates the use of SCP to upload files to remote locations on multiple servers:#!/usr/bin/env python # -*- coding: UTF-8 -*- import os import logging from remote import SSHClient logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) class FileUploader(object): def __init__(self, version, creds_list): self.version = version self.creds_list = creds_list def get_remote_directory(self): return "/disks/sdb/files/{}".format( self.version) def get_create_remote_directory_cmd(self): return "mkdir -p {}".format( self.get_remote_directory()) @staticmethod def get_local_directory(): directory = os.environ['CSV_DIR'] return "cd {}".format(directory) def create_remote_directories(self, creds): ssh = SSHClient(creds["host"], creds["user"], creds["pass"]) # create the remote directory structure ssh_stdin, ssh_stdout, ssh_stderr = ssh.execute(self.get_create_remote_directory_cmd()) logger.info("Results:\n\tstd-in = {}\n\tstd-out = {}\n\tstd-err = {}".format( ssh_stdin, ssh_stdout, ssh_stderr )) ssh.close() def upload_from_local(self, creds): local_path = "{}/*.json".format( os.environ["CSV_DIR"]) cmd = "scp {} {}@{}:{}".format( local_path, creds["user"], creds["host"], self.get_remote_directory() ) os.system(cmd) def process(self): for creds in self.creds_list: self.create_remote_directories(creds) self.upload_from_local(creds) def get_creds_list(): creds_m1 = { "host": os.environ["HOST_1"], "user": os.environ["USER_1"], "pass": os.environ["PASS_1"] } creds_m2 = { "host": os.environ["HOST_2"], "user": os.environ["USER_2"], "pass": os.environ["PASS_2"] } return [creds_m1, creds_m2] if __name__ == "__main__": from file_uploader import FileUploader uploader = FileUploader("2.0", get_creds_list()) uploader.process()
Paramiko and Apache HIVE
If Hive is exposed on a port as a service on your server, there are better solutions for HIVE access, such as pyhs2. pyHS2 is a python client driver for connecting to hive server 2.
However, there are times when Apache HIVE is not enabled as a service remotely accessible via a server and port. In these situations, access via SSH may be a possibility.
#!/usr/bin/env python # -*- coding: UTF-8 -*- import os import paramiko import logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) host = os.environ['HIVE_HOST'] username = os.environ['HIVE_USERNAME'] password = os.environ['HIVE_PASSWORD'] hive_location = os.environ['HIVE_LOCATION'] class HiveDbReader(object): """ Purpose: An instance of a HiveDbReader Object """ def __init__(self): """Return a connector object""" logger.debug("Initialized HiveDbReader instance") def get_ssh_connection(self): """ Initiates an SSH Connection to the Host """ ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect(host, username=username, password=password) logger.info("Connection Created (host = {}, username = {}, password = *****)".format(host, username)) except ValueError: logger.error("Connection Failed") return ssh def execute_on_stdout(self, ssh, hive_sql): """ Executes a HIVE SQL Command via SSH and returns results on StdOut Sample Command: /loc/to/hive -e 'select * from table where xyz' """ hive_sql = hive_sql.strip() hive_command = '{} -e "{}"'.format(hive_location, hive_sql) logger.debug('Executing HIVE Command on StdOut:\n\t{}'.format(hive_command)) return ssh.exec_command(hive_command) def get_employees(self): the_employees = [] hive_select_cat_sql = """ select employees from mydb group by employees; """.strip() ssh = self.get_ssh_connection() ssh_stdin, ssh_stdout, ssh_stderr = self.execute_on_stdout(ssh, hive_select_cat_sql) try: for line in ssh_stdout.readlines(): an_employee = line.replace('"', '').strip() the_employees.append(an_employee) except Exception as e: logger.error("Hive Command Access Failed") ssh.close() logger.info("Returned employees (total = {})".format(len(the_employees))) return the_employees
No comments:
Post a Comment