2024-09-01
Python
00

目录

pymysql封装
clickhouse_driver封装

pymysql封装

只封装了select选择数据和insert插入数据:

python
# encoding: utf-8 import traceback import pymysql import pandas as pd class easy_pymysql(): # 初始化 def __init__(self, host, user, passwd): self.host = host self.user = user self.passwd = passwd self.conn = None self.cursor = None def connect(self): self.conn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, # database=self.dbName, use_unicode=True, charset='utf8') self.cursor = self.conn.cursor() def close(self): self.cursor.close() self.conn.close() def select_data(self, sql): result = None self.connect() try: self.cursor.execute(sql) result = self.cursor.fetchall() result = pd.DataFrame(result) except: traceback.print_exc() self.conn.rollback() self.close() return result def insert(self, df: pd.DataFrame, tablename: str): df = df.to_dict(orient="split") tup_col = ", ".join(df['columns']) datas = str([tuple(i) for i in df['data']])[1:-1] sql = f"INSERT INTO {tablename} ({tup_col}) VALUES {datas}" self.connect() try: self.cursor.execute(sql) self.conn.commit() except: traceback.print_exc() self.conn.rollback() self.close() def insert_table(self, df: pd.DataFrame, tablename: str): if len(df) <= 20000: self.insert(df, tablename) else: n = len(df) // 20000 for i in range(n): df_sub = df[20000 * i:(20000 * (i + 1))] self.insert(df_sub, tablename) df_sub = df[(20000 * (i + 1)):] self.insert(df_sub, tablename) print(f"finish transfer mysql {len(df)}") def update_table(self, df: pd.DataFrame, tablename: str): self.connect() try: idchname, upchname = list(df.columns) print(idchname, upchname) id_list = list(df[idchname]) upch_list = list(df[upchname]) for id_, upch_ in zip(id_list, upch_list): sql = f"UPDATE {tablename} SET {upchname}={upch_} WHERE {idchname}={id_}" self.cursor.execute(sql) self.conn.commit() except: traceback.print_exc() self.conn.rollback() self.close()

clickhouse_driver封装

只封装了select选择数据和insert插入数据:

python
# encoding: utf-8 from clickhouse_driver.client import Client import pandas as pd class easy_clickhouse(): # 初始化 # def __init__(self, host, user="", passwd=""): def __init__(self, host): self.host = host self.client = Client(host=self.host) def select_data(self, sql): result = self.client.execute(sql) if len(result) == 0: return None else: result = pd.DataFrame(result) return result def insert(self, df: pd.DataFrame, tablename: str): df = df.to_dict(orient="split") tup_col = ", ".join(df['columns']) datas = str([tuple(i) for i in df['data']])[1:-1] sql = f"INSERT INTO {tablename} ({tup_col}) VALUES {datas}" self.client.execute(sql) def insert_table(self, df: pd.DataFrame, tablename: str): if len(df) <= 20000: self.insert(df, tablename) else: n = len(df) // 20000 for i in range(n): df_sub = df[20000 * i:(20000 * (i + 1))] self.insert(df_sub, tablename) df_sub = df[(20000 * (i + 1)):] self.insert(df_sub, tablename) print(f"finish transfer clickhouse {len(df)}")
python
def insert_db_table(datadict_list): ''' datadict_list是字典列表 每个字典是一条记录 开发环境的 ''' host = '10.10.90.11' table = "tbs" sql = "insert into %s " % (table) c = clientclickhouse(host=host) sum1 = 0 for datadict in datadict_list: result = datadict # 字典 names = ["%s" % x for x in result.keys()] names = ",".join(names) if sum1 == 0: sql = sql + "(%s) values " % (names) values = ["'%s'" % x if isinstance(x, str) else "{}".format(x) for x in result.values()] values = ",".join(values) sql = sql + "(%s)," % (values) sum1 = sum1 + 1 sql_str = sql[0:-1] c.execute(sql_str)
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:Dong

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!