import logging
import os
import sys
import pymysql
import pandas as pd
import datetime
import time
parent_path = os.path.dirname(sys.path[0])
if parent_path not in sys.path:
sys.path.append(parent_path)
today = datetime.date.today().strftime("%Y-%m-%d")
from sshtunnel import SSHTunnelForwarder
today = datetime.date.today().strftime("%Y-%m-%d")
logging.basicConfig(level=logging.INFO,
filename=os.path.join(os.getcwd(), 'open_cases_clean' + today + '.log'),
# 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
filemode='w',
# 日志格式
format='%(process)d %(asctime)s %(filename)s %(funcName)s [line:%(lineno)d] %(levelname)s %(message)s'
)
server = SSHTunnelForwarder(
("192.168.0.2", 22),
ssh_password="root",
ssh_username='root',
remote_bind_address=("mysql.rds.com", 3306))
class Cle():
def __init__(self):
pass
def clean(self):
server.start()
self.default_con = pymysql.connect(host='127.0.0.1', port=server.local_bind_port, user='mysql',
passwd='root', db='root', charset='utf8',
cursorclass=pymysql.cursors.DictCursor, connect_timeout=7200)
self.cursor_con_tow = self.default_con.cursor()
sql = "select * from tables"
self.cursor_con_tow.execute(sql)
rows = self.cursor_con_tow.fetchall()
for row in rows:
print(row)
if __name__ == '__main__':
cle = Cle()
cle.clean()
clean.cursor_con_tow.close()
clean.default_con.close()
server.close()
import logging
import os
import sys
import pymysql
import pandas as pd
import datetime
import time
parent_path = os.path.dirname(sys.path[0])
if parent_path not in sys.path:
sys.path.append(parent_path)
from sshtunnel import SSHTunnelForwarder
today = datetime.date.today().strftime("%Y-%m-%d")
logging.basicConfig(level=logging.INFO,
filename=os.path.join(os.getcwd(), 'cle' + today + '.log'),
# 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
filemode='w',
# 日志格式
format='%(process)d %(asctime)s %(filename)s %(funcName)s [line:%(lineno)d] %(levelname)s %(message)s'
)
server = SSHTunnelForwarder(
("192.168.0.2", 22),
ssh_password="root",
ssh_username='root',
remote_bind_address=("mysql.rds.com", 3306))
class Cle():
def __init__(self):
pass
def clean(self):
server.start()
self.default_con = pymysql.connect(host='127.0.0.1', port=server.local_bind_port, user='mysql',
passwd='root', db='root', charset='utf8',
cursorclass=pymysql.cursors.DictCursor, connect_timeout=7200)
self.cursor_con_tow = self.default_con.cursor()
sql = "select * from tables"
self.cursor_con_tow.execute(sql)
rows = self.cursor_con_tow.fetchall()
for row in rows:
print(row)
if __name__ == '__main__':
cle = Cle()
cle.clean()
clean.cursor_con_tow.close()
clean.default_con.close()
server.close()