玖叶教程网

前端编程开发入门

python ssh通道操作mysql(高中信息技术合格性考试python操作题)

import logging
import os
import sys
import pymysql
import pandas as pd
import datetime
import time
parent_path = os.path.dirname(sys.path[0])
if parent_path not in sys.path:
 sys.path.append(parent_path)
today = datetime.date.today().strftime("%Y-%m-%d")

from sshtunnel import SSHTunnelForwarder
today = datetime.date.today().strftime("%Y-%m-%d")
logging.basicConfig(level=logging.INFO,
 filename=os.path.join(os.getcwd(), 'open_cases_clean' + today + '.log'),
 # 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
 filemode='w',
 # 日志格式
 format='%(process)d %(asctime)s %(filename)s %(funcName)s [line:%(lineno)d] %(levelname)s %(message)s'
 )

server = SSHTunnelForwarder(
 ("192.168.0.2", 22),
 ssh_password="root",
 ssh_username='root',
 remote_bind_address=("mysql.rds.com", 3306))

class Cle():
 def __init__(self):
 pass

 def clean(self):
 server.start()
 self.default_con = pymysql.connect(host='127.0.0.1', port=server.local_bind_port, user='mysql',
 passwd='root', db='root', charset='utf8',
 cursorclass=pymysql.cursors.DictCursor, connect_timeout=7200)
 self.cursor_con_tow = self.default_con.cursor()
 sql = "select * from tables"
 self.cursor_con_tow.execute(sql)
 rows = self.cursor_con_tow.fetchall() 
 for row in rows:
 print(row)

if __name__ == '__main__':
 cle = Cle()
 cle.clean()
 clean.cursor_con_tow.close()
 clean.default_con.close() 
 server.close()
import logging
import os
import sys
import pymysql
import pandas as pd
import datetime
import time
parent_path = os.path.dirname(sys.path[0])
if parent_path not in sys.path:
 sys.path.append(parent_path)
from sshtunnel import SSHTunnelForwarder

today = datetime.date.today().strftime("%Y-%m-%d")
logging.basicConfig(level=logging.INFO,
 filename=os.path.join(os.getcwd(), 'cle' + today + '.log'),
 # 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
 filemode='w',
 # 日志格式
 format='%(process)d %(asctime)s %(filename)s %(funcName)s [line:%(lineno)d] %(levelname)s %(message)s'
 )

server = SSHTunnelForwarder(
 ("192.168.0.2", 22),
 ssh_password="root",
 ssh_username='root',
 remote_bind_address=("mysql.rds.com", 3306))

class Cle():
 def __init__(self):
 pass

 def clean(self):
 server.start()
 self.default_con = pymysql.connect(host='127.0.0.1', port=server.local_bind_port, user='mysql',
 passwd='root', db='root', charset='utf8',
 cursorclass=pymysql.cursors.DictCursor, connect_timeout=7200)
 self.cursor_con_tow = self.default_con.cursor()
 sql = "select * from tables"
 self.cursor_con_tow.execute(sql)
 rows = self.cursor_con_tow.fetchall() 
 for row in rows:
 print(row)

if __name__ == '__main__':
 cle = Cle()
 cle.clean()
 clean.cursor_con_tow.close()
 clean.default_con.close() 
 server.close()

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言