paramiko介绍
paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。其实它的底层是对ssh的上层代码的一个封装。
ssh是一个协议,OpenSSH是其中一个开源实现,paramiko是Python的一个库,实现了SSHv2协议(底层使用cryptography)。
有了Paramiko以后,我们就可以在Python代码中直接使用SSH协议对远程服务器执行操作,而不是通过ssh命令对远程服务器进行操作。
安装
paramiko属于第三方库,依赖pycrypto库:
pip3 install pycrypto # Windows安装失败,安装pycrypto?dome 或者 pycryptodomex.pycryptodomex 一个独立于旧 PyCrypto 的库,所有模块都安装在Cryptodome包下。旧的 PyCrypto 和 PyCryptodome 可以共存
pip3 install pycryptodomex
pip3 install paramiko
paramiko核心组件-SSHClient和SFTPClient
- SSHClient的作用类似于Linux的ssh命令,是对SSH会话的封装,该类封装了传输(Transport),通道(Channel)及SFTPClient建立的方法(open_sftp),通常用于执行远程命令。
- SFTPClient的作用类似与Linux的sftp命令,是对SFTP客户端的封装,用以实现远程文件操作,如文件上传、下载、修改文件权限等操作。
SSHClient常用的方法介绍
- connect():实现远程服务器的连接与认证,对于该方法只有hostname是必传参数。
常用参数:
hostname 连接的目标主机
port=SSH_PORT 指定端口
username=None 验证的用户名
password=None 验证的用户密码
pkey=None 私钥方式用于身份验证
key_filename=None 一个文件名或文件列表,指定私钥文件
timeout=None 可选的tcp连接超时时间
allow_agent=True, 是否允许连接到ssh代理,默认为True 允许
look_for_keys=True 是否在~/.ssh中搜索私钥文件,默认为True 允许
compress=False, 是否打开压缩
- set_missing_host_key_policy():设置远程服务器没有在know_hosts文件中记录时的应对策略。目前支持三种策略:
设置连接的远程主机没有本地主机密钥或HostKeys对象时的策略,目前支持三种:
- AutoAddPolicy 自动添加主机名及主机密钥到本地HostKeys对象,不依赖load_system_host_key的配置。即新建立ssh连接时不需要再输入yes或no进行确认;
- WarningPolicy 用于记录一个未知的主机密钥的python警告。并接受,功能上和AutoAddPolicy类似,但是会提示是新连接;
- RejectPolicy 自动拒绝未知的主机名和密钥,依赖load_system_host_key的配置。此为默认选项。
- exec_command():在远程服务器执行Linux命令的方法。
- open_sftp():在当前ssh会话的基础上创建一个sftp会话。该方法会返回一个SFTPClient对象。
- invoke_shell(): 用于创建一个子shell进程,这样所有的操作都可以在该子shell中进行,su切换用户不受影响,但该方法没有exec_command那种方便的ChannelFile对象,所有的标准输出和标准错误内容都通过invoke_shell返回对象的recv方法来获取,每一次调用recv只会从上一次返回的地方开始返回,也没有直接获取命令退出状态
SFTPClient常用方法介绍
SFTPCLient作为一个sftp的客户端对象,根据ssh传输协议的sftp会话,实现远程文件操作,如上传、下载、权限、状态
- from_transport(cls,t) 创建一个已连通的SFTP客户端通道
- put(localpath, remotepath, callback=None, confirm=True) 将本地文件上传到服务器 参数confirm:是否调用stat()方法检查文件状态,返回ls -l的结果
- get(remotepath, localpath, callback=None) 从服务器下载文件到本地
- mkdir() 在服务器上创建目录
- remove() 在服务器上删除目录
- rename() 在服务器上重命名目录
- stat() 查看服务器文件状态
- listdir() 列出服务器目录下的文件
具体示例方法
# 定义一个类,表示一台远端linux主机
class SSHLinux(object):
# 通过IP, 用户名,密码,超时时间初始化一个远程Linux主机
def __init__(self, host, username, password, timeout=30, port=22):
self.host = host
self.username = username
self.password = password
self.timeout = timeout
self.port = port
self.ssh = None
self.trans = None
self.sftp = None
# 链接失败的重试次数
self.try_times = 3
def connect(self):
while True:
try:
# 基于用户名和密码的 transport 方式登录
self.trans = transport.Transport(self.host, self.port) # 创建一个通道
self.trans.connect(username=self.username, password=self.password) # 连接
self.ssh = client.SSHClient() # 实例化SSHClient
self.ssh.load_system_host_keys() # 加载系统HostKeys密钥
self.ssh._transport = self.trans
# 自动添加策略,保存服务器的主机名和密钥信息,如果不添加,那么不再本地know_hosts文件中记录的主机将无法连接
self.ssh.set_missing_host_key_policy(client.AutoAddPolicy())
self.sftp = sftp_client.SFTPClient.from_transport(self.trans) # 实例化一个SFTPClient对象
return True
except Exception as e:
if self.try_times != 0:
print(f'连接{self.host}失败: {str(e)},进行重试...')
self.try_times -= 1
else:
print(f'重试3次失败,结束程序')
return False
def send(self, cmd):
# get_pty=True 解决 nohup 执行了但是 进程没有启动成功
# nohup 执行成功了,但是python程序阻塞了,无法停止,解决方式 nohup xx.xx & sleep 1 &后增加sleep 1
ssh_stdin, ssh_stdout, ssh_stderr = self.ssh.exec_command(cmd, get_pty=True)
try:
logger.info(f"ssh stdout:{ssh_stdout.read()}")
except:
logger.info(f"ssh stderr:{ssh_stderr.read()}")
def send_invoke_shell_msg(self, password, cmd):
""" 切换为invoke_shell模式执行命令
:param password:
:param cmd:
:return:
"""
try:
channel = self.ssh.invoke_shell()
time.sleep(0.1)
channel.send(f"sudo su \n") # 切换为root
buff = ''
while not buff.endswith(f'[sudo] password for {self.username}: '):
resp = channel.recv(9999)
buff += resp.decode('utf-8')
channel.send(password)
channel.send('\n')
buff = ''
while not buff.endswith('# '): # 当指令执行结束后,Linux窗口会显示#,等待下条指令,所以可以用作识别全部输出结束的标志。
resp = channel.recv(9999)
buff += resp.decode('utf-8')
print("------end------")
# 查看是否切换成功
channel.send(cmd)
channel.send("\n")
buff = ''
while not buff.endswith('# '):
resp = channel.recv(9999)
buff += resp.decode('utf-8')
print(buff)
except paramiko.ssh_exception.AuthenticationException:
print('Failed to login. ip username or password not correct.')
exit(-1)
def close(self):
# 关闭连接
self.trans.close()
self.ssh.close()
self.sftp.close()
# SFTPClient
def upload_file(self, local_path, remote_path):
""" 文件上传
:param local_path: 本地文件地址
:param remote_path: 远程地址
:return:
"""
try:
self.sftp.put(local_path, remote_path)
logger.info(f"服务器:{self.host}, 文件: {remote_path.rsplit('/', 1)[-1]}上传成功.")
return True
except Exception as e:
logger.error(f"文件:{remote_path.rsplit('/', 1)[-1]} 上传失败:{e}")
return False
def download_file(self, local_path, remote_path):
""" 文件下载
:param local_path: 本地要下载的目录
:param remote_path: 远程文件地址
:return:
"""
try:
self.sftp.get(remote_path, local_path)
logger.info(f"服务器:{self.host}, 文件: {remote_path.rsplit('/', 1)[-1]}下载成功.")
return True
except Exception as e:
logger.error(f"文件:{remote_path.rsplit('/', 1)[-1]} 下载失败:{e}")
return False
def update_chmod(self, path, mode=777):
""" 文件权限修改 """
try:
self.sftp.chmod(path, mode)
logger.info(f"服务器:{self.host}, 文件: {path}权限变更成功.")
return True
except Exception as e:
logger.error(f"服务器:{self.host}, 文件: {path}权限变更失败: {e}.")
return False
def update_chown(self, path, uid, gid):
"""文件属主修改"""
try:
self.sftp.chown(path, uid, gid)
logger.info(f"服务器:{self.host}, 文件: {path}属主变更成功.")
return True
except Exception as e:
logger.error(f"服务器:{self.host}, 文件: {path}属主更新失败:{e}")
return False
def create_dir(self, path, mode=777):
"""新建文件夹"""
if not os.path.exists(path):
try:
self.sftp.mkdir(path)
logger.info(f"服务器:{self.host}, 文件夹: {path}创建成功.")
return True
except Exception as e:
logger.error(f"服务器:{self.host}, 文件夹: {path}创建失败:{e}")
return False
def rm_dir(self, path):
"""删除文件夹"""
try:
self.sftp.rmdir(path)
logger.info(f"服务器:{self.host}, 文件夹: {path}删除成功.")
return True
except Exception as e:
logger.error(f"服务器:{self.host}, 文件夹: {path}删除失败:{e}")
return False
def remove_file(self, file_path):
"""删除文件"""
try:
self.sftp.remove(file_path)
logger.info(f"服务器:{self.host}, 文件: {file_path}删除成功.")
return True
except Exception as e:
logger.error(f"服务器:{self.host}, 文件: {file_path}删除失败:{e}")
return False
def rename_file(self, old_path, new_path):
"""文件重命名"""
try:
self.sftp.rename(old_path, new_path)
logger.info(f"服务器:{self.host}, 文件: {new_path}重命名成功.")
return True
except Exception as e:
logger.error(f"服务器:{self.host}, 文件: {new_path}重命名失败:{e}")
return False
def get_file(self, file_path, mode="r", bufsize=-1):
"""获取文件"""
try:
return self.sftp.file(file_path, mode=mode, bufsize=bufsize)
except Exception as e:
logger.error(f"获取文件失败:{e}")
return False
def open_file(self, file_path, mode="r", bufsize=-1):
"""打开文件"""
try:
return self.sftp.open(file_path, mode=mode, bufsize=bufsize)
except Exception as e:
logger.error(f"获取文件失败:{e}")
return False
def stat_path(self, file_path):
"""查看文件属性"""
try:
self.sftp.stat(file_path)
return True
except Exception as e:
logger.error(f"文件不存在或stat失败:{e}")
return False
总结
近几天在写链路测试平台,正好涉及到远程文件上传和执行,记录一下paramiko的使用过程!