赞
踩
本文是如何用Python和Faust创建流处理管道系列文章的第二部分。如果您还不熟悉Faust的一般概念,建议先阅读系列文章的第一部分。
今天,我们将建立一个简单的流处理管道,包含多个任务。这是Kafka streams的常见用例,也是探索Faust的一种有趣方式。
好的,现在让我们动起来!
在开始新项目和学习新技术时,最令人沮丧的事情之一就是设置项目结构。在单个Python模块中启动Faust项目是完全可行的,但是如果您打算创建多个流处理任务,则最好从设置项目结构开始。
Faust为大中型项目提供了建议的布局,这种方法将以可安装库的形式分发项目。我们不会这样做,但仍将重用大多数建议的目录结构。
我们的项目结构如下:
+ pipeline/ + pipeline/ - __init__.py - __main__.py - app.py + fetcher/ - __init__.py - agents.py - models.py + normaliser/ - __init__.py - agents.py - models.py + requirements/ - base.txt - prod.txt - test.txt + tests/ - __init__.py - test_normaliser.py - Dockerfile - docker-compose.yaml
顶级文件夹pipeline
是项目根目录,包含所有其他文件和目录,其中最重要的是嵌套的pipeline
目录,包含app.py
模块。
先创建项目文件夹。
mkdir pipeline && cd pipeline/
在项目根目录下创建Dockerfile和docker-compose.yml.
touch Dockerfile && \
touch docker-compose.yaml
创建嵌套的pipeline
文件夹,存储源代码。
mkdir pipeline && \
mkdir requirements && \
mkdir tests
创建requirements.txt
文件。
printf "%s\n" "faust==1.10.4" "pytz==2021.1" "requests==2.25.1" > requirements/base.txt && \
printf "%s\n" "-r base.txt" > requirements/prod.txt && \
printf "%s\n" "-r base.txt" "pytest==6.2.3" > requirements/test.txt
现在,进入我们的主应用程序目录。
cd pipeline/
现在我们位于主应用程序目录( pipeline/pipeline/
)中,为Faust程序创建顶级文件。
touch __init__.py && \
touch __main__.py && \
touch app.py
我们还将为管道任务创建顶级目录。
mkdir fetcher && \
mkdir normaliser
将数据管道逻辑写入app.py
。
# app.py import faust VERSION = 1 PROJECT = "pipeline" # our root directory ORIGIN = "pipeline" # our app directory AUTODISCOVER = [ f"{ORIGIN}.fetcher", f"{ORIGIN}.normaliser", ] BROKER = "kafka://kafka:9092" app = faust.App( PROJECT, version=VERSION, autodiscover=AUTODISCOVER, origin
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。