赞
踩
import datetime from time import time import pymysql def get_connection(host, user, passwd, dbname, port): db = pymysql.connect(host=host, user=user, passwd=passwd, db=dbname, port=port) cursor = db.cursor(pymysql.cursors.DictCursor) return db, cursor # 读取库表,得到所有的表名 def get_column_name(table_in, cursor, is_not_first, incremental_parame): zt = False column_name = [] # 表中列名信息 sql = "describe %s" % table_in cursor.execute(sql) result = cursor.fetchall() for j in result: column_name.append(j['Field'].lower()) # 将列名转成小写 # 找到主键 PRIMARY_key_sql = "select column_name from INFORMATION_SCHEMA.KEY_COLUMN_USAGE t where t.table_name='%s'" % ( table_in) cursor.execute(PRIMARY_key_sql) PRIMARY_key = cursor.fetchone() # 不适合联合主键 # 无主键时,主键置空 if PRIMARY_key is None: PRIMARY_key = '' else: PRIMARY_key = PRIMARY_key['column_name'].lower() # 有主键时,取到主键,转为小写 # 主键不为空,且表名在增量信息中,且不是首次数据导入的,进行增强插入 if len(PRIMARY_key) > 0 and table_in in incremental_parame.keys() and is_not_first: temp_date = datetime.datetime.now() # 获取当前时间 年月日时分秒 yesterday = (temp_date + datetime.timedelta(days=-1)).strftime("%Y-%m-%d %H:%M:%S") # 获取当前日期的前一天日期 valuesql = "select * from %s where %s>'%s'" % (table_in, incremental_parame[table_in], yesterday) else: # 主键为空,或首次数据导入,进行全量导入 valuesql = "select * from %s" % table_in zt = True # 状态 PRIMARY_key = '' print(valuesql) cursor.execute(valuesql) value_result = cursor.fetchall() return column_name, value_result, zt, PRIMARY_key # 数据插入 def data_loads(column_names, value_result, cursor, table_name, db, truncat, PRIMARY_key): # 如果没有主键,则清空全量更新 if truncat: sql0 = "truncate table %s;" % (table_name) cursor.execute(sql0) else: for i in value_result: # 大小写转换 if PRIMARY_key in i.keys(): k = i[PRIMARY_key] elif PRIMARY_key.upper() in i.keys(): k = i[PRIMARY_key.upper()] sql1 = "delete from %s where %s ='%s';" % (table_name, PRIMARY_key, k) cursor.execute(sql1) col = ",".join([str(i).lower() for i in column_names]) value = ",".join(["%s"] * len(value_result[0])) data = [tuple(value_result[i].values()) for i in range(len(value_result))] # lamda表达式获取值 sql = "insert into " + table_name + "(" + col + ") values (" + value + ")" # print(sql) cursor.executemany(sql, tuple(data)) db.commit() end_time = time() run_time = end_time - start_time print(table_name + ' 插入数据条数:' + str(len(data)) + ',运行时常:' + str(run_time) + '秒') return "successful" # 不存在创建表 def create_table(connection_in, connection_out, in_tablename): sql1 = "SHOW CREATE TABLE %s" % (in_tablename) # 查询建表语句 connection_in.execute(sql1) create_table = connection_in.fetchone() sql2 = create_table['Create Table'].lower() connection_out.execute(sql2) return "successful" # 获取out的所有表 def read_table_name(out_table, cursor): read_tables = "SHOW TABLES" cursor.execute(read_tables) table_out = cursor.fetchall() tables = [] for i in table_out: tables.append(i['Tables_in_' + out_table]) return tables if __name__ == '__main__': # 获取in db_in, connection_in = get_connection('host1', 'user1', 'password1', 'DB1', 3306) # 获取out out_table = 'DB2' db_out, connection_out = get_connection('host2', 'user2', 'password2', out_table, 3306) out_table_name = read_table_name(out_table, connection_out) # 定义所需要抽取的表 tabname_list_in = ['person', 'class'] # 手动补充增量信息(更新时间补充) incremental_parame = {'person': 'updatetime', 'class': 'updatetime'} for i in tabname_list_in: start_time = time() is_not_first = True if i not in out_table_name: # 增加建表语句 create_table(connection_in, connection_out, i) is_not_first = False # 获取到所需表下的列信息和数据 column_names, value_result, truncat, PRIMARY_key = get_column_name(i, connection_in, is_not_first,incremental_parame) # 有数据更新时,进行数据更新 if len(value_result) >= 1: data_loads(column_names, value_result, connection_out, i, db_out, truncat, PRIMARY_key) connection_in.close() connection_out.close()
此代码只需将真实的MySQL账号密码,及表名及更新时间字段补充完善,即可直接运行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。