玖叶教程网

前端编程开发入门

python远程控制模块paramiko

paramiko介绍

paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。其实它的底层是对ssh的上层代码的一个封装。

ssh是一个协议,OpenSSH是其中一个开源实现,paramiko是Python的一个库,实现了SSHv2协议(底层使用cryptography)。

有了Paramiko以后,我们就可以在Python代码中直接使用SSH协议对远程服务器执行操作,而不是通过ssh命令对远程服务器进行操作。

安装

paramiko属于第三方库,依赖pycrypto库:

pip3 install pycrypto # Windows安装失败,安装pycrypto?dome 或者 pycryptodomex.pycryptodomex 一个独立于旧 PyCrypto 的库所有模块都安装在Cryptodome包下。旧的 PyCrypto 和 PyCryptodome 可以共存

pip3 install pycryptodomex

pip3 install paramiko

paramiko核心组件-SSHClient和SFTPClient

  • SSHClient的作用类似于Linux的ssh命令,是对SSH会话的封装,该类封装了传输(Transport),通道(Channel)及SFTPClient建立的方法(open_sftp),通常用于执行远程命令。
  • SFTPClient的作用类似与Linux的sftp命令,是对SFTP客户端的封装,用以实现远程文件操作,如文件上传、下载、修改文件权限等操作。



SSHClient常用的方法介绍

  • connect():实现远程服务器的连接与认证,对于该方法只有hostname是必传参数。

常用参数:

hostname 连接的目标主机
port=SSH_PORT 指定端口
username=None 验证的用户名
password=None 验证的用户密码
pkey=None 私钥方式用于身份验证
key_filename=None 一个文件名或文件列表,指定私钥文件
timeout=None 可选的tcp连接超时时间
allow_agent=True, 是否允许连接到ssh代理,默认为True 允许
look_for_keys=True 是否在~/.ssh中搜索私钥文件,默认为True 允许
compress=False, 是否打开压缩
  • set_missing_host_key_policy():设置远程服务器没有在know_hosts文件中记录时的应对策略。目前支持三种策略:

设置连接的远程主机没有本地主机密钥或HostKeys对象时的策略,目前支持三种:

  • AutoAddPolicy 自动添加主机名及主机密钥到本地HostKeys对象,不依赖load_system_host_key的配置。即新建立ssh连接时不需要再输入yes或no进行确认;
  • WarningPolicy 用于记录一个未知的主机密钥的python警告。并接受,功能上和AutoAddPolicy类似,但是会提示是新连接;
  • RejectPolicy 自动拒绝未知的主机名和密钥,依赖load_system_host_key的配置。此为默认选项。
  • exec_command():在远程服务器执行Linux命令的方法。
  • open_sftp():在当前ssh会话的基础上创建一个sftp会话。该方法会返回一个SFTPClient对象。
  • invoke_shell(): 用于创建一个子shell进程,这样所有的操作都可以在该子shell中进行,su切换用户不受影响,但该方法没有exec_command那种方便的ChannelFile对象,所有的标准输出和标准错误内容都通过invoke_shell返回对象的recv方法来获取,每一次调用recv只会从上一次返回的地方开始返回,也没有直接获取命令退出状态

SFTPClient常用方法介绍

SFTPCLient作为一个sftp的客户端对象,根据ssh传输协议的sftp会话,实现远程文件操作,如上传、下载、权限、状态

  • from_transport(cls,t) 创建一个已连通的SFTP客户端通道
  • put(localpath, remotepath, callback=None, confirm=True) 将本地文件上传到服务器 参数confirm:是否调用stat()方法检查文件状态,返回ls -l的结果
  • get(remotepath, localpath, callback=None) 从服务器下载文件到本地
  • mkdir() 在服务器上创建目录
  • remove() 在服务器上删除目录
  • rename() 在服务器上重命名目录
  • stat() 查看服务器文件状态
  • listdir() 列出服务器目录下的文件

具体示例方法

# 定义一个类,表示一台远端linux主机
class SSHLinux(object):
    # 通过IP, 用户名,密码,超时时间初始化一个远程Linux主机
    def __init__(self, host, username, password, timeout=30, port=22):
        self.host = host
        self.username = username
        self.password = password
        self.timeout = timeout
        self.port = port
        self.ssh = None
        self.trans = None
        self.sftp = None
        # 链接失败的重试次数
        self.try_times = 3

    def connect(self):
        while True:
            try:
                # 基于用户名和密码的 transport 方式登录
                self.trans = transport.Transport(self.host, self.port)  # 创建一个通道
                self.trans.connect(username=self.username, password=self.password)  # 连接

                self.ssh = client.SSHClient()  # 实例化SSHClient
                self.ssh.load_system_host_keys()  # 加载系统HostKeys密钥
                self.ssh._transport = self.trans
                # 自动添加策略,保存服务器的主机名和密钥信息,如果不添加,那么不再本地know_hosts文件中记录的主机将无法连接
                self.ssh.set_missing_host_key_policy(client.AutoAddPolicy())

                self.sftp = sftp_client.SFTPClient.from_transport(self.trans)  # 实例化一个SFTPClient对象
                return True
            except Exception as e:
                if self.try_times != 0:
                    print(f'连接{self.host}失败: {str(e)},进行重试...')
                    self.try_times -= 1
                else:
                    print(f'重试3次失败,结束程序')
                    return False

    def send(self, cmd):
        # get_pty=True 解决 nohup 执行了但是 进程没有启动成功
        # nohup 执行成功了,但是python程序阻塞了,无法停止,解决方式  nohup xx.xx  & sleep 1    &后增加sleep 1
        ssh_stdin, ssh_stdout, ssh_stderr = self.ssh.exec_command(cmd, get_pty=True)
        try:
            logger.info(f"ssh stdout:{ssh_stdout.read()}")
        except:
            logger.info(f"ssh stderr:{ssh_stderr.read()}")

    def send_invoke_shell_msg(self, password, cmd):
        """ 切换为invoke_shell模式执行命令
        :param password:
        :param cmd:
        :return:
        """
        try:
            channel = self.ssh.invoke_shell()
            time.sleep(0.1)

            channel.send(f"sudo su \n")  # 切换为root
            buff = ''
            while not buff.endswith(f'[sudo] password for {self.username}: '):
                resp = channel.recv(9999)
                buff += resp.decode('utf-8')

            channel.send(password)
            channel.send('\n')
            buff = ''
            while not buff.endswith('# '):  # 当指令执行结束后,Linux窗口会显示#,等待下条指令,所以可以用作识别全部输出结束的标志。
                resp = channel.recv(9999)
                buff += resp.decode('utf-8')
            print("------end------")

            # 查看是否切换成功
            channel.send(cmd)
            channel.send("\n")
            buff = ''
            while not buff.endswith('# '):
                resp = channel.recv(9999)
                buff += resp.decode('utf-8')
            print(buff)
        except paramiko.ssh_exception.AuthenticationException:
            print('Failed to login. ip username or password not correct.')
            exit(-1)

    def close(self):
        # 关闭连接
        self.trans.close()
        self.ssh.close()
        self.sftp.close()

    # SFTPClient
    def upload_file(self, local_path, remote_path):
        """ 文件上传
        :param local_path: 本地文件地址
        :param remote_path: 远程地址
        :return:
        """
        try:
            self.sftp.put(local_path, remote_path)
            logger.info(f"服务器:{self.host}, 文件: {remote_path.rsplit('/', 1)[-1]}上传成功.")
            return True
        except Exception as e:
            logger.error(f"文件:{remote_path.rsplit('/', 1)[-1]} 上传失败:{e}")
            return False

    def download_file(self, local_path, remote_path):
        """ 文件下载
        :param local_path: 本地要下载的目录
        :param remote_path: 远程文件地址
        :return:
        """
        try:
            self.sftp.get(remote_path, local_path)
            logger.info(f"服务器:{self.host}, 文件: {remote_path.rsplit('/', 1)[-1]}下载成功.")
            return True
        except Exception as e:
            logger.error(f"文件:{remote_path.rsplit('/', 1)[-1]} 下载失败:{e}")
            return False

    def update_chmod(self, path, mode=777):
        """ 文件权限修改 """
        try:
            self.sftp.chmod(path, mode)
            logger.info(f"服务器:{self.host}, 文件: {path}权限变更成功.")
            return True
        except Exception as e:
            logger.error(f"服务器:{self.host}, 文件: {path}权限变更失败: {e}.")
            return False

    def update_chown(self, path, uid, gid):
        """文件属主修改"""
        try:
            self.sftp.chown(path, uid, gid)
            logger.info(f"服务器:{self.host}, 文件: {path}属主变更成功.")
            return True
        except Exception as e:
            logger.error(f"服务器:{self.host}, 文件: {path}属主更新失败:{e}")
            return False

    def create_dir(self, path, mode=777):
        """新建文件夹"""
        if not os.path.exists(path):
            try:
                self.sftp.mkdir(path)
                logger.info(f"服务器:{self.host}, 文件夹: {path}创建成功.")
                return True
            except Exception as e:
                logger.error(f"服务器:{self.host}, 文件夹: {path}创建失败:{e}")
                return False

    def rm_dir(self, path):
        """删除文件夹"""
        try:
            self.sftp.rmdir(path)
            logger.info(f"服务器:{self.host}, 文件夹: {path}删除成功.")
            return True
        except Exception as e:
            logger.error(f"服务器:{self.host}, 文件夹: {path}删除失败:{e}")
            return False

    def remove_file(self, file_path):
        """删除文件"""
        try:
            self.sftp.remove(file_path)
            logger.info(f"服务器:{self.host}, 文件: {file_path}删除成功.")
            return True
        except Exception as e:
            logger.error(f"服务器:{self.host}, 文件: {file_path}删除失败:{e}")
            return False

    def rename_file(self, old_path, new_path):
        """文件重命名"""
        try:
            self.sftp.rename(old_path, new_path)
            logger.info(f"服务器:{self.host}, 文件: {new_path}重命名成功.")
            return True
        except Exception as e:
            logger.error(f"服务器:{self.host}, 文件: {new_path}重命名失败:{e}")
            return False

    def get_file(self, file_path, mode="r", bufsize=-1):
        """获取文件"""
        try:
            return self.sftp.file(file_path, mode=mode, bufsize=bufsize)
        except Exception as e:
            logger.error(f"获取文件失败:{e}")
            return False

    def open_file(self, file_path, mode="r", bufsize=-1):
        """打开文件"""
        try:
            return self.sftp.open(file_path, mode=mode, bufsize=bufsize)
        except Exception as e:
            logger.error(f"获取文件失败:{e}")
            return False

    def stat_path(self, file_path):
        """查看文件属性"""
        try:
            self.sftp.stat(file_path)
            return True
        except Exception as e:
            logger.error(f"文件不存在或stat失败:{e}")
            return False

总结

近几天在写链路测试平台,正好涉及到远程文件上传和执行,记录一下paramiko的使用过程!

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言