当前位置:   article > 正文

如何用Python Faust构建流数据管道(下篇)_python 流数据处理引擎

python 流数据处理引擎

本文是如何用Python和Faust创建流处理管道系列文章的第二部分。如果您还不熟悉Faust的一般概念,建议先阅读系列文章的第一部分。

今天,我们将建立一个简单的流处理管道,包含多个任务。这是Kafka streams的常见用例,也是探索Faust的一种有趣方式。

好的,现在让我们动起来!

项目布局

在开始新项目和学习新技术时,最令人沮丧的事情之一就是设置项目结构。在单个Python模块中启动Faust项目是完全可行的,但是如果您打算创建多个流处理任务,则最好从设置项目结构开始。

Faust为大中型项目提供了建议的布局,这种方法将以可安装库的形式分发项目。我们不会这样做,但仍将重用大多数建议的目录结构。

我们的项目结构如下:

+ pipeline/
    + pipeline/
        - __init__.py
        - __main__.py
        - app.py
        + fetcher/
            - __init__.py
            - agents.py
            - models.py
        + normaliser/
            - __init__.py
            - agents.py
            - models.py
    + requirements/
        - base.txt
        - prod.txt
        - test.txt
    + tests/
        - __init__.py
        - test_normaliser.py
    - Dockerfile
    - docker-compose.yaml
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

顶级文件夹pipeline是项目根目录,包含所有其他文件和目录,其中最重要的是嵌套的pipeline目录,包含app.py模块。

先创建项目文件夹。

mkdir pipeline && cd pipeline/
  • 1

在项目根目录下创建Dockerfile和docker-compose.yml.

touch Dockerfile && \
touch docker-compose.yaml
  • 1
  • 2

创建嵌套的pipeline文件夹,存储源代码。

mkdir pipeline && \
mkdir requirements && \
mkdir tests
  • 1
  • 2
  • 3

创建requirements.txt文件。

printf "%s\n" "faust==1.10.4" "pytz==2021.1" "requests==2.25.1" > requirements/base.txt && \
printf "%s\n" "-r base.txt" > requirements/prod.txt && \
printf "%s\n" "-r base.txt" "pytest==6.2.3" > requirements/test.txt
  • 1
  • 2
  • 3

现在,进入我们的主应用程序目录。

cd pipeline/
  • 1

Faust程序主入口

现在我们位于主应用程序目录( pipeline/pipeline/ )中,为Faust程序创建顶级文件。

touch __init__.py && \
touch __main__.py && \
touch app.py
  • 1
  • 2
  • 3

我们还将为管道任务创建顶级目录。

mkdir fetcher && \
mkdir normaliser
  • 1
  • 2

将数据管道逻辑写入app.py

# app.py
import faust

VERSION = 1

PROJECT = "pipeline"  # our root directory
ORIGIN = "pipeline"  # our app directory

AUTODISCOVER = [
    f"{ORIGIN}.fetcher",
    f"{ORIGIN}.normaliser",
]

BROKER = "kafka://kafka:9092"

app = faust.App(
    PROJECT,
    version=VERSION,
    autodiscover=AUTODISCOVER,
    origin
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/927139
推荐阅读
相关标签
  

闽ICP备14008679号