当前位置:   article > 正文

MySQL数据同步到另一台MySQL(全量及增量同步)_mysql 全量与增量同步切换 不停库

mysql 全量与增量同步切换 不停库
import datetime
from time import time
import pymysql

def get_connection(host, user, passwd, dbname, port):
    db = pymysql.connect(host=host, user=user, passwd=passwd, db=dbname, port=port)
    cursor = db.cursor(pymysql.cursors.DictCursor)
    return db, cursor

# 读取库表,得到所有的表名
def get_column_name(table_in, cursor, is_not_first, incremental_parame):
    zt = False
    column_name = []
    # 表中列名信息
    sql = "describe %s" % table_in
    cursor.execute(sql)
    result = cursor.fetchall()
    for j in result:
        column_name.append(j['Field'].lower())  # 将列名转成小写
    # 找到主键
    PRIMARY_key_sql = "select column_name from INFORMATION_SCHEMA.KEY_COLUMN_USAGE t where t.table_name='%s'" % (
        table_in)
    cursor.execute(PRIMARY_key_sql)
    PRIMARY_key = cursor.fetchone()    # 不适合联合主键
    # 无主键时,主键置空
    if PRIMARY_key is None:
        PRIMARY_key = ''
    else:
        PRIMARY_key = PRIMARY_key['column_name'].lower()    # 有主键时,取到主键,转为小写
    # 主键不为空,且表名在增量信息中,且不是首次数据导入的,进行增强插入
    if len(PRIMARY_key) > 0 and table_in in incremental_parame.keys() and is_not_first:
        temp_date = datetime.datetime.now()  # 获取当前时间 年月日时分秒
        yesterday = (temp_date + datetime.timedelta(days=-1)).strftime("%Y-%m-%d %H:%M:%S")  # 获取当前日期的前一天日期
        valuesql = "select * from %s where %s>'%s'" % (table_in, incremental_parame[table_in], yesterday)
    else:
        # 主键为空,或首次数据导入,进行全量导入
        valuesql = "select * from %s" % table_in
        zt = True      # 状态
        PRIMARY_key = ''
    print(valuesql)
    cursor.execute(valuesql)
    value_result = cursor.fetchall()
    return column_name, value_result, zt, PRIMARY_key


# 数据插入
def data_loads(column_names, value_result, cursor, table_name, db, truncat, PRIMARY_key):
    # 如果没有主键,则清空全量更新
    if truncat:
        sql0 = "truncate table %s;" % (table_name)
        cursor.execute(sql0)
    else:
        for i in value_result:
            # 大小写转换
            if PRIMARY_key in i.keys():
                k = i[PRIMARY_key]
            elif PRIMARY_key.upper() in i.keys():
                k = i[PRIMARY_key.upper()]
            sql1 = "delete from %s where %s ='%s';" % (table_name, PRIMARY_key, k)
            cursor.execute(sql1)
    col = ",".join([str(i).lower() for i in column_names])
    value = ",".join(["%s"] * len(value_result[0]))
    data = [tuple(value_result[i].values()) for i in range(len(value_result))]  # lamda表达式获取值
    sql = "insert into " + table_name + "(" + col + ") values (" + value + ")"
    # print(sql)
    cursor.executemany(sql, tuple(data))
    db.commit()
    end_time = time()
    run_time = end_time - start_time
    print(table_name + ' 插入数据条数:' + str(len(data)) + ',运行时常:' + str(run_time) + '秒')
    return "successful"


# 不存在创建表
def create_table(connection_in, connection_out, in_tablename):
    sql1 = "SHOW CREATE TABLE %s" % (in_tablename)  # 查询建表语句
    connection_in.execute(sql1)
    create_table = connection_in.fetchone()
    sql2 = create_table['Create Table'].lower()
    connection_out.execute(sql2)
    return "successful"


# 获取out的所有表
def read_table_name(out_table, cursor):
    read_tables = "SHOW TABLES"
    cursor.execute(read_tables)
    table_out = cursor.fetchall()
    tables = []
    for i in table_out:
        tables.append(i['Tables_in_' + out_table])
    return tables


if __name__ == '__main__':
    # 获取in
    db_in, connection_in = get_connection('host1', 'user1', 'password1', 'DB1', 3306)
    # 获取out
    out_table = 'DB2'
    db_out, connection_out = get_connection('host2', 'user2', 'password2', out_table, 3306)
    out_table_name = read_table_name(out_table, connection_out)
    # 定义所需要抽取的表
    tabname_list_in = ['person', 'class']
    # 手动补充增量信息(更新时间补充)
    incremental_parame = {'person': 'updatetime', 'class': 'updatetime'}
    for i in tabname_list_in:
        start_time = time()
        is_not_first = True
        if i not in out_table_name:
            # 增加建表语句
            create_table(connection_in, connection_out, i)
            is_not_first = False
        # 获取到所需表下的列信息和数据
        column_names, value_result, truncat, PRIMARY_key = get_column_name(i, connection_in, is_not_first,incremental_parame)
        # 有数据更新时,进行数据更新
        if len(value_result) >= 1:
            data_loads(column_names, value_result, connection_out, i, db_out, truncat, PRIMARY_key)

    connection_in.close()
    connection_out.close()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121

此代码只需将真实的MySQL账号密码,及表名及更新时间字段补充完善,即可直接运行。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/978008
推荐阅读
相关标签
  

闽ICP备14008679号