赞
踩
from pyspark.sql import SparkSession, SQLContext from pyspark.sql import Window ACCESS_KEY_ID = 'xxx' SECRET_ACCESS_KEY = 'xxx' END_POINT = 'xxx' spark = SparkSession \ .builder \ .appName("write to S3") \ .enableHiveSupport() \ .config("spark.debug.maxToStringFields", "500") \ .getOrCreate() #设置ceph的信息 hadoopConf = spark.sparkContext._jsc.hadoopConfiguration() hadoopConf.set("fs.s3a.endpoint", END_POINT) hadoopConf.set("fs.s3a.access.key", ACCESS_KEY_ID) hadoopConf.set("fs.s3a.secret.key", SECRET_ACCESS_KEY) sql = ''' hive sql取数 ''' bucket_name = 'xxx' bucket_path = f"s3a://{bucket_name}" spark.sql(sql).repartition(1).write.mode("overwrite").parquet(bucket_path)
跟上面类似,只需要把最后的写入改为读即可
hive_path = 'xxx'
df = spark.read.parquet(bucket_path)
df.write.parquet(f'{hive_path}', mode='overwrite')
比较麻烦的一点是要注意,写入s3时候的parquet文件只能控制文件夹的名字,而里面的文件名往往是类似‘part-00000-c6089499-0ad2-4ff8-aae4-7c64f291c728-c000.snappy.parquet’这样的。这是由于文件多线程写入时,hive为了保证唯一性。暂时未找到方法解决。如果使用toPandas().to_csv()
中途可能会报内存不足GC
这里主要是用的是boto3来处理,比较好用
import json import boto3 with open('credentials.json', 'r') as fd: credentials = json.loads(fd.read()) s3 = boto3.resource('s3', endpoint_url=credentials['endpoint_url'], aws_access_key_id=credentials['access_key'], aws_secret_access_key=credentials['secret_key']) bucket = s3.Bucket(credentials['bucket_name']) # 如上文所说文件名字无法控制,因此key需要用prefix遍历筛选出来 for obj in bucket.objects.filter(Prefix='xx'): key = obj.key bucket.download_file(Filename='xx.parquet', Key=key) bucket.upload_file(Filename='xx.parquet', Key=key)
如果不想让文件下载到本地占用空间,可以直接object转为dataframe,速度也很快
import io
key = 'key_in_s3'
buffer = io.BytesIO()
s3.Object(bucket_name=credentials['bucket_name'], key=key).download_fileobj(buffer)
df = pd.read_parquet(buffer)
上传这里也可以直接dataframe上传,但是类似的api我一直传上去是空的,无奈用了简单API
up_buffer = io.BytesIO()
df.to_parquet(up_buffer, index=False)
s3.Object(bucket_name=credentials['bucket_name'],\
key='xx.parquet')\
.put(Body=up_buffer.getvalue())
# s3.Object(bucket_name=credentials['bucket_name'],
# key='xx.parquet')\
# .upload_fileobj(up_buffer.getvalue())
也有一些其他的用法,这里大概列举一些
import boto3 s3 = boto3.resource("s3") # 创建一个 bucket bucket = s3.create_bucket(Bucket="my-bucket") # 获得所有的 bucket, boto 会自动处理 API 的翻页等信息。 for bucket in s3.buckets.all(): print(bucket.name) # 过滤 bucket, 同样返回一个 bucket_iterator s3.buckets.fitler() # 生成一个 Bucket 资源对象 bucket = s3.Bucket("my-bucket") bucket.name # bucket 的名字 bucket.delete() # 删除 bucket # 下载文件 bucket.download_file(Key, Filename, ExtraArgs=None, Callback=None, Config=None) # 列出所有对象 bucket.objects.all() # 过滤并返回对象 objects = bucket.objects.filter( Delimiter='string', EncodingType='url', Marker='string', MaxKeys=123, Prefix='string', RequestPayer='requester', ExpectedBucketOwner='string' ) # 创建一个对象 obj = bucket.Object("xxx") # 或者 obj = s3.Object("my-bucket", "key") # 删除对象 obj.delete() # 下载对象 obj.download_file(path) # 自动多线程下载 with open('filename', 'wb') as data: obj.download_fileobj(data) # 获取文件内容 rsp = obj.get() body = rsp["Body"].read() # 文件内容 # 上传文件 obj.upload_file(filename) # 自动多线程上传 obj.upload_fileobj(fileobj)
其他的API使用可以查阅reference的官方文档,建议只阅读高级API部分即可
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。