当前位置:   article > 正文

Flink消费pubsub问题_flink pubsubsource

flink pubsubsource

我看网上flink消费pubsub的资料并不多,最近跑通了,大家有问题的可以给我留言。

一、基本资料

1.flink官网接入方式

Google Cloud PubSub | Apache Flink

  1. StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  2. public class PubsubRecordDeserializer implements PubSubDeserializationSchema<RecordSchema>{...}
  3. SourceFunction<SomeObject> pubsubSource = PubSubSource.newBuilder()
  4. .withDeserializationSchema(new PubsubRecordDeserializer())
  5. .withProjectName("project")
  6. .withSubscriptionName("subscription")
  7. .build();
  8. streamExecEnv.addSource(source);

注意这里是project,不是topic

鉴权方式和反序列化方式后面会讲)

2.maven依赖

  1. <dependency>
  2. <groupId>com.google.cloud</groupId>
  3. <artifactId>google-cloud-pubsub</artifactId>
  4. <version>1.62.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-connector-gcp-pubsub_2.12</artifactId>
  9. <version>1.14.2</version>
  10. </dependency>

版本自己定。

3.credential鉴权方法

需要先有含有鉴权信息的JSON文件(谷歌授权)

然后有两个方法选一个可以实现

1)设置环境变量 GOOGLE_APPLICATION_CREDENTIALS

值为本地的access key文件。

2)在程序中加载resource文件

  1. PubSubSource.newBuilder()
  2. ....
  3. .withCredentials(ServiceAccountCredentials.fromStream(getAutheficateFile()))

ServiceAccountCredentials这个类可以接入流式文件。返回一个实现了Credentials接口的方法,

  1. public static InputStream getAutheficateFile() {
  2. String configFile = "/key.json";
  3. InputStream credentialsFile = PubsubSubscriber.class.getResourceAsStream(configFile);
  4. return credentialsFile;
  5. }

即可传入参数来用。

4.反序列化方法

点开 withDeserializationSchema方法,发现传入的反序列化对象,需要实现

PubSubDeserializationSchema接口

主要实现接口里的这两个方法

其中 RecordSchema类是自己定义的接收自己需要信息的case class。实现了setter和getter方法。

(核心用户数据在getdata里)

二、碰到的问题

===================================================================

我在开发中 碰到了反序列化PubsubMessage的问题,理解有点偏差,

去StackOverflow提问了下,不过后来自己理解了。

返回的形式已经是一个PubsubMessage类的对象了,指定的schema只是给解析后自己用的。

(之前理解以为要指定一个返回的消息类型的schema,来承接数据接入)

deserialization - What is the proper way to use flink-connector-gcp-pubsub - Stack Overflow

(CSDN居然吞掉StackOverflow的链接

题目是 What is the proper way to use flink-connector-gcp-pubsub

作者 Reina )

=====================================================================

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号