只封装了select选择数据和insert插入数据:
python# encoding: utf-8
import traceback
import pymysql
import pandas as pd
class easy_pymysql():
# 初始化
def __init__(self, host, user, passwd):
self.host = host
self.user = user
self.passwd = passwd
self.conn = None
self.cursor = None
def connect(self):
self.conn = pymysql.connect(host=self.host,
user=self.user,
password=self.passwd,
# database=self.dbName,
use_unicode=True,
charset='utf8')
self.cursor = self.conn.cursor()
def close(self):
self.cursor.close()
self.conn.close()
def select_data(self, sql):
result = None
self.connect()
try:
self.cursor.execute(sql)
result = self.cursor.fetchall()
result = pd.DataFrame(result)
except:
traceback.print_exc()
self.conn.rollback()
self.close()
return result
def insert(self, df: pd.DataFrame, tablename: str):
df = df.to_dict(orient="split")
tup_col = ", ".join(df['columns'])
datas = str([tuple(i) for i in df['data']])[1:-1]
sql = f"INSERT INTO {tablename} ({tup_col}) VALUES {datas}"
self.connect()
try:
self.cursor.execute(sql)
self.conn.commit()
except:
traceback.print_exc()
self.conn.rollback()
self.close()
def insert_table(self, df: pd.DataFrame, tablename: str):
if len(df) <= 20000:
self.insert(df, tablename)
else:
n = len(df) // 20000
for i in range(n):
df_sub = df[20000 * i:(20000 * (i + 1))]
self.insert(df_sub, tablename)
df_sub = df[(20000 * (i + 1)):]
self.insert(df_sub, tablename)
print(f"finish transfer mysql {len(df)}")
def update_table(self, df: pd.DataFrame, tablename: str):
self.connect()
try:
idchname, upchname = list(df.columns)
print(idchname, upchname)
id_list = list(df[idchname])
upch_list = list(df[upchname])
for id_, upch_ in zip(id_list, upch_list):
sql = f"UPDATE {tablename} SET {upchname}={upch_} WHERE {idchname}={id_}"
self.cursor.execute(sql)
self.conn.commit()
except:
traceback.print_exc()
self.conn.rollback()
self.close()
只封装了select选择数据和insert插入数据:
python# encoding: utf-8
from clickhouse_driver.client import Client
import pandas as pd
class easy_clickhouse():
# 初始化
# def __init__(self, host, user="", passwd=""):
def __init__(self, host):
self.host = host
self.client = Client(host=self.host)
def select_data(self, sql):
result = self.client.execute(sql)
if len(result) == 0:
return None
else:
result = pd.DataFrame(result)
return result
def insert(self, df: pd.DataFrame, tablename: str):
df = df.to_dict(orient="split")
tup_col = ", ".join(df['columns'])
datas = str([tuple(i) for i in df['data']])[1:-1]
sql = f"INSERT INTO {tablename} ({tup_col}) VALUES {datas}"
self.client.execute(sql)
def insert_table(self, df: pd.DataFrame, tablename: str):
if len(df) <= 20000:
self.insert(df, tablename)
else:
n = len(df) // 20000
for i in range(n):
df_sub = df[20000 * i:(20000 * (i + 1))]
self.insert(df_sub, tablename)
df_sub = df[(20000 * (i + 1)):]
self.insert(df_sub, tablename)
print(f"finish transfer clickhouse {len(df)}")
pythondef insert_db_table(datadict_list):
'''
datadict_list是字典列表 每个字典是一条记录
开发环境的
'''
host = '10.10.90.11'
table = "tbs"
sql = "insert into %s " % (table)
c = clientclickhouse(host=host)
sum1 = 0
for datadict in datadict_list:
result = datadict # 字典
names = ["%s" % x for x in result.keys()]
names = ",".join(names)
if sum1 == 0:
sql = sql + "(%s) values " % (names)
values = ["'%s'" % x if isinstance(x, str) else "{}".format(x) for x in result.values()]
values = ",".join(values)
sql = sql + "(%s)," % (values)
sum1 = sum1 + 1
sql_str = sql[0:-1]
c.execute(sql_str)
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!