赞
踩
上传csv文件到hive, 自动建表小脚本。
# coding=utf8 """ requirements: tableschema sqlalchemy pandas pyarrow pyhive """ from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals from tableschema import Table from sqlalchemy import create_engine import pandas as pd import logging import numpy as np import time import os logging_format = '%(asctime)s pid:%(process)d %(levelname)s %(module)s - %(message)s' logging.basicConfig(level=logging.INFO, format=logging_format, filemode='a') SCHEMA_HIVE_TYPES = { "integer": "BIGINT", "number": "DOUBLE", "string": "STRING", "int": "BIGINT" } class HiveUploader(object): """ 上传csv, 在hive建表 """ def __init__(self, source_file, table_name, location, sqlalchemy_uri, if_exits="fail"): """ hive uploader init params :param source_file: file path for csv to upload :param table_name: table_name to be created :param location: oss or s3 or hdfs :param sqlalchemy_uri: hive connection uri, e.g. "hive://{username}:{password}@{host}:{port}/default" :param if_exits: replace, append, or fail, default is "fail" """ self._source_file = source_file self._target_file = os.path.join("/tmp", table_name) self._table_name = table_name self._location = location self._engine = create_engine(sqlalchemy_uri, connect_args={"auth": "LDAP"}) self._if_exits = if_exits self._db_name = table_name.split(".")[0] self._tbl_name = table_name.split(".")[1] def _drop_table(self): statement = """drop table if exists %s""" % self._table_name self._engine.execute(statement) def _check_if_exits(self): """ check if the table_name exists. :return: """ sql = """show tables from {} like '{}'""".format(self._db_name, self._tbl_name) df = pd.read_sql_query(sql, self._engine) exist = not df.empty logging.info("table %s exists status: %s" % (table_name, exist)) return not df.empty def _drop_table(self): """ drop table before create :return: """ sql = """drop table if exists {}""".format(self._table_name) logging.info(sql) self._engine.execute(sql) def replace(self): # 1: drop table self._drop_table() self.upload() def repair_table(self): sql = "msck repair table {}".format(self._table_name) self._engine.execute(sql) def append(self): target_file, create_sql = self.prepare_upload() HiveUploader.upload_to_oss(target_file, self._location + str(int(time.time()))) self.repair_table() def prepare_upload(self): # parse columns df = pd.read_csv(source_file) table_schema = Table(source_file).infer() fields = table_schema.get("fields") schema = {} columns = [] for field in fields: column_name = field['name'] col_type = field["type"] if column_name not in df.columns: continue if col_type in SCHEMA_HIVE_TYPES: column_type = SCHEMA_HIVE_TYPES[col_type] else: column_type = "STRING" schema[column_name] = column_type columns.append(column_name) df.columns = columns for column_name in columns: column_type = schema[column_name] try: if column_type == "BIGINT": df[column_name].fillna(0, inplace=True) df[column_name] = df[column_name].astype(np.int64) elif column_type == "DOUBLE": df[column_name].fillna(0, inplace=True) df[column_name] = df[column_name].astype(np.float64) else: df[column_name].fillna('', inplace=True) df[column_name] = df[column_name].astype(str) except: df[column_name] = df[column_name].astype(str) schema[column_name] = "STRING" df.tail().to_parquet(self._target_file) column_defs = ["`{}` {}".format(c, schema[c]) for c in columns] column_definition = ",\n ".join(column_defs) location, file_name = os.path.split(self._location) create_sql = """ create table {} ( {} ) STORED AS parquet location '{}' """.format(self._table_name, column_definition, location) return self._target_file, create_sql @staticmethod def upload_to_oss(target_file, oss_path): rm = "oss rm " + oss_path os.system(rm) cp = "oss cp {} {}".format(target_file, oss_path) os.system(cp) def upload(self): """upload to oss and create table""" logging.info("start to upload file to oss!") target_file, create_sql = self.prepare_upload() # upload file to oss HiveUploader.upload_to_oss(target_file, self._location) # create table self._engine.execute(create_sql) def run(self): # 1: check if table exists exists = self._check_if_exits() if exists: if self._if_exits == 'fail': msg = "table_name {} is already exists!".format(self._table_name) logging.error(msg) raise Exception(msg) elif self._if_exits == "append": self.append() elif self._if_exits == "replace": self.replace() else: self.upload() sql = """select * from {} limit 100""".format(self._table_name) df = pd.read_sql_query(sql, self._engine) print(df) if __name__ == '__main__': hive_uri = "hive://{username}:{password}@1{ip}:{port}/default" source_file = "~/Desktop/magic.csv" table_name = "test.test_upload_here_test_v1" location = "oss://{bucket_name}//{}/{}.parquet".format(table_name, table_name) hive_uploader = HiveUploader(source_file, table_name, location, hive_uri, if_exits="fail") hive_uploader.run()
全栈开发工程师,有工作需要可联系微信: taojian2009
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。