当前位置:   article > 正文

Python利用boto3以及Pyspark操作AWS S3_pyspark 写s3

pyspark 写s3

一、需求背景

  1. 我收到的需求是首先用pyspark将hive数据导出,然后在服务器进行机器学习计算,结果再返回hive仓库,之前一直使用mysql作为中转,实际发现hive数据太大,转到mysql的ETL部分是单线程的很慢,遂改用AWS的s3作为中转,记录常用的操作
  2. Boto3有两种API,低级和高级
  • 低级API:是和AWS的HTTP接口一一对应的,通过boto3.client(“xx”)暴露;
  • 高级API:是面向对象的,通过boto3.resource(“xxx”)暴露,易于使用但是美中不足不一定覆盖所有API。
  1. 在AWS 文档中,经常混用 resource 和 client 两套接口,也没有任何提示,文档的首页除了简单的提了一句有两套 API 外再没有单独的介绍了。高级 API 是很简单易用的,但是跟简单API却是混在一起,难以辨析。网上以及stack上面都是很容易混。在本文中主要使用高级API

二、Pyspark <-> S3的读写

1. Pyspark读取hive表数据写入s3:

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import Window

ACCESS_KEY_ID = 'xxx'
SECRET_ACCESS_KEY = 'xxx'
END_POINT = 'xxx'
spark = SparkSession \
    .builder \
    .appName("write to S3") \
    .enableHiveSupport() \
    .config("spark.debug.maxToStringFields", "500") \
    .getOrCreate()
    
#设置ceph的信息
hadoopConf = spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.endpoint", END_POINT)
hadoopConf.set("fs.s3a.access.key", ACCESS_KEY_ID)
hadoopConf.set("fs.s3a.secret.key", SECRET_ACCESS_KEY)

sql = '''
    hive sql取数
'''
bucket_name = 'xxx'
bucket_path = f"s3a://{bucket_name}"
spark.sql(sql).repartition(1).write.mode("overwrite").parquet(bucket_path)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

2. Pyspark读取s3数据写入hive表:

跟上面类似,只需要把最后的写入改为读即可

hive_path = 'xxx'
df = spark.read.parquet(bucket_path)
df.write.parquet(f'{hive_path}', mode='overwrite')
  • 1
  • 2
  • 3

比较麻烦的一点是要注意,写入s3时候的parquet文件只能控制文件夹的名字,而里面的文件名往往是类似‘part-00000-c6089499-0ad2-4ff8-aae4-7c64f291c728-c000.snappy.parquet’这样的。这是由于文件多线程写入时,hive为了保证唯一性。暂时未找到方法解决。如果使用toPandas().to_csv()中途可能会报内存不足GC

三、Boto3读写s3上的文件

1. Boto3读写

这里主要是用的是boto3来处理,比较好用

import json
import boto3

with open('credentials.json', 'r') as fd:
    credentials = json.loads(fd.read())
    
s3 = boto3.resource('s3',
                    endpoint_url=credentials['endpoint_url'],
                    aws_access_key_id=credentials['access_key'],                aws_secret_access_key=credentials['secret_key'])
bucket = s3.Bucket(credentials['bucket_name'])

# 如上文所说文件名字无法控制,因此key需要用prefix遍历筛选出来
for obj in bucket.objects.filter(Prefix='xx'):
    key = obj.key
bucket.download_file(Filename='xx.parquet',
                          Key=key)
bucket.upload_file(Filename='xx.parquet', 
                        Key=key)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

如果不想让文件下载到本地占用空间,可以直接object转为dataframe,速度也很快

import io
key = 'key_in_s3'
buffer = io.BytesIO()
s3.Object(bucket_name=credentials['bucket_name'], key=key).download_fileobj(buffer)
df = pd.read_parquet(buffer)
  • 1
  • 2
  • 3
  • 4
  • 5

上传这里也可以直接dataframe上传,但是类似的api我一直传上去是空的,无奈用了简单API

up_buffer = io.BytesIO()
df.to_parquet(up_buffer, index=False)

s3.Object(bucket_name=credentials['bucket_name'],\
		key='xx.parquet')\
     	.put(Body=up_buffer.getvalue())

# s3.Object(bucket_name=credentials['bucket_name'], 
# key='xx.parquet')\
# .upload_fileobj(up_buffer.getvalue())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2.其他用法

也有一些其他的用法,这里大概列举一些

import boto3

s3 = boto3.resource("s3")

# 创建一个 bucket
bucket = s3.create_bucket(Bucket="my-bucket")

# 获得所有的 bucket, boto 会自动处理 API 的翻页等信息。
for bucket in s3.buckets.all():
    print(bucket.name)

# 过滤 bucket, 同样返回一个 bucket_iterator
s3.buckets.fitler()

# 生成一个 Bucket 资源对象
bucket = s3.Bucket("my-bucket")
bucket.name  # bucket 的名字
bucket.delete()  # 删除 bucket

# 下载文件
bucket.download_file(Key, Filename, ExtraArgs=None, Callback=None, Config=None)

# 列出所有对象
bucket.objects.all()

# 过滤并返回对象
objects = bucket.objects.filter(
    Delimiter='string',
    EncodingType='url',
    Marker='string',
    MaxKeys=123,
    Prefix='string',
    RequestPayer='requester',
    ExpectedBucketOwner='string'
)

# 创建一个对象
obj = bucket.Object("xxx")
# 或者
obj = s3.Object("my-bucket", "key")

# 删除对象
obj.delete()
# 下载对象
obj.download_file(path)
# 自动多线程下载
with open('filename', 'wb') as data:
    obj.download_fileobj(data)
# 获取文件内容
rsp = obj.get()
body = rsp["Body"].read()  # 文件内容

# 上传文件
obj.upload_file(filename)
# 自动多线程上传
obj.upload_fileobj(fileobj)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

其他的API使用可以查阅reference的官方文档,建议只阅读高级API部分即可

Reference

  1. aws s3 文档
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/917885
推荐阅读
相关标签
  

闽ICP备14008679号