赞
踩
否则会报找不到 pymysql的错;
from elasticsearch import Elasticsearch import pandas as pd from sqlalchemy import create_engine from urllib.parse import quote_plus as urlquote import time pd.set_option('expand_frame_repr', False) # 当列太多时显示完整 pd.set_option('display.max_rows', 10000) # 显示的最大行数 # start_time = sys.argv[1] # end_time = sys.argv[2] userName = "xxxx" password = "xxxx" dbHost = "xxx.xx.xx.xx" dbPort = "xxxx" dbName = "xxxx" es = Elasticsearch([{"scheme": "http", "host": "xxx.xx.xx.xx", "port": xxxx}]) # q = {"query": {"match_all": {}},"size":20} # track_total_hits: 拿到符合过滤条件的所有数据,而不是最大限制的10000条 json_body = { "from": 0, "size": 10000, "track_total_hits": True, "query": { "bool": { "must": { "match_all": {} }, "filter": { "range": { "createTime": { "gt": "2020-11-01 00:00:00", "lt": "2020-11-01 23:59:59" } } } } } } # index:索引名称 query = es.search(index="xxxx", body=json_body) # es查询出的结果第一页(这是根据es返回的body信息进行解析,拿到_sorce数据才是主要数据部分) results = query['hits']['hits'] # es查询出的结果总量 total = query['hits']['total'] print(total) # 将数据进行结构化处理 df = pd.DataFrame(results) col = pd.DataFrame(list(df["_source"])) # print(df) print(col) # 程序开始时间 start = time.time() # 这里是pandas的数据插入mysql的信息,其中password采用urlquote进行编码,可以避免密码中含有@符号而导致地址解析错误;密码不含@符号,可直接填密码; db_connect = """mysql+pymysql://%s:%s@%s:%s/%s?charset=utf8""" % (userName, urlquote(password), dbHost, dbPort, dbName) engine = create_engine(db_connect) con = engine.connect() # 如果存在该表,就在数据后面进行追加; # name:Mysql表名 col.to_sql(name='xxxx', con=con, if_exists='append') con.close() end = time.time() print("程序运行结束!!!") print("程序运行时间:", end - start)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。