赞
踩
该例子为从一个Collection获取数据然后插入到另外一个Collection中。
Flink的基本处理过程可以清晰地分为以下几个阶段:
package com.wfg.flink.connector.mongodb; import com.alibaba.fastjson2.JSON; import com.mongodb.client.model.InsertOneModel; import com.wfg.flink.connector.mongodb.model.WellCastingInfo; import com.wfg.flink.connector.mongodb.schema.WellCastingInfoDeserializationSchema; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.mongodb.sink.MongoSink; import org.apache.flink.connector.mongodb.source.MongoSource; import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.bson.BsonDocument; /** * @author wfg */ @Slf4j public class Main { public static void main(String[] args) throws Exception { MongoSource<WellCastingInfo> mongoSource = MongoSource.<WellCastingInfo>builder() .setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin") .setDatabase("uux") .setCollection("castingInfo") // .setProjectedFields("_id", "f0", "f1") .setFetchSize(2048) .setLimit(10000) .setNoCursorTimeout(true) .setPartitionStrategy(PartitionStrategy.SAMPLE) .setPartitionSize(MemorySize.ofMebiBytes(64)) .setSamplesPerPartition(10) .setDeserializationSchema(new WellCastingInfoDeserializationSchema()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从 MongoDB 读取数据 DataStream<WellCastingInfo> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "Mongo Source"); // 进行转换(如果需要) DataStream<WellCastingInfo> transformedStream = sourceStream.map((MapFunction<WellCastingInfo, WellCastingInfo>) value -> { // 转换逻辑 return value; }); MongoSink<WellCastingInfo> sink = MongoSink.<WellCastingInfo>builder() .setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin") .setDatabase("uux") .setCollection("castingInfo_back") .setMaxRetries(3) // .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setSerializationSchema( (input, context) -> new InsertOneModel<>(BsonDocument.parse(JSON.toJSONString(input)))) .build(); transformedStream.sinkTo(sink); // stream.sinkTo(sink); // 执行作业 env.execute("Mongo Flink Demo"); } }
package com.wfg.flink.connector.mongodb.schema; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONReader; import com.wfg.flink.connector.mongodb.model.WellCastingInfo; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema; import org.bson.BsonDocument; import java.util.Date; /** * @author wfg */ @Slf4j public class WellCastingInfoDeserializationSchema implements MongoDeserializationSchema<WellCastingInfo> { @Override public WellCastingInfo deserialize(BsonDocument bsonDocument) { WellCastingInfo rs = null; try { JSONObject obj = JSONObject.parseObject(bsonDocument.toJson()); obj.remove("_id"); obj.remove("time"); obj.remove("_class"); rs = obj.to(WellCastingInfo.class, JSONReader.Feature.IgnoreAutoTypeNotMatch); if (bsonDocument.getObjectId("_id") != null) { rs.setId(bsonDocument.getObjectId("_id").getValue().toString()); } if (bsonDocument.get("time") != null) { rs.setTime(new Date(bsonDocument.getDateTime("time").getValue())); } } catch (Exception e) { log.error("数据格式错误:{}:{}", bsonDocument.toJson(), e); } return rs; } @Override public TypeInformation<WellCastingInfo> getProducedType() { return TypeExtractor.getForClass(WellCastingInfo.class); } }
package com.wfg.flink.connector.mongodb.model; import lombok.Data; import java.util.Date; /** * @author wfg */ @Data public class WellCastingInfo { private String id; private String comCode; private Date time; private String yjsfzt; private String yjsyl; private String jjaqzfzt; private String spjk01; private String jyjqy; }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wfg.flink.connector</groupId> <version>1.0-SNAPSHOT</version> <artifactId>connector-mongodb</artifactId> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.18.1</flink.version> <log4j.version>2.14.1</log4j.version> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-mongodb</artifactId> <version>1.1.0-1.18</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.50</version> </dependency> </dependencies> </project>
MongoSource通常指的是一个自定义的数据源(Source),用于从MongoDB数据库中读取数据。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>1.1.0-1.18</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<YourDataType> dataStream = env.addSource(new YourMongoSource());
// ... 后续的数据处理和转换操作 ...
注意事项:
- 确保MongoDB服务器的地址、端口和凭据等信息在MongoSource中正确配置。
- 根据需求,可以调整MongoDB的查询条件、分页参数等,以控制从MongoDB中读取的数据量和频率。
- 如果Flink作业需要处理大量的数据,考虑使用MongoDB的索引来优化查询性能。
- 在处理完数据后,确保关闭与MongoDB的连接,以避免资源泄漏。
env.fromSource 并不是一个直接的方法或表达式。env 通常指的是 Flink 的 StreamExecutionEnvironment 或 ExecutionEnvironment 对象,它们用于设置 Flink 流处理或批处理作业的上下文和执行环境。
然而,为了从外部数据源读取数据到 Flink 作业中,会使用 env 对象上的各种方法来创建数据源。例如,对于流处理,可能会使用 env.addSource(sourceFunction),其中 sourceFunction 是一个实现了 SourceFunction 接口或继承自 RichParallelSourceFunction 的类,它定义了如何从外部系统(如 Kafka、文件系统、数据库等)读取数据。
对于常见的外部数据源,Flink 提供了各种预定义的连接器和数据源函数,可以直接使用它们,而无需自己实现 SourceFunction。例如:
算子(Operator)是数据处理的核心构建块。它们定义了如何转换或处理数据流(DataStream)或数据集(DataSet)。Flink 提供了丰富的算子库来支持各种数据处理任务。以下是一些常见的 Flink 算子操作:
public SinkWriter<IN> createWriter(Sink.InitContext context) {
return new MongoWriter(this.connectionOptions, this.writeOptions, this.writeOptions.getDeliveryGuarantee() == DeliveryGuarantee.AT_LEAST_ONCE, context, this.serializationSchema);
}
MongoSink<WellCastingInfo> sink = MongoSink.<WellCastingInfo>builder()
.setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin")
.setDatabase("sjzz")
.setCollection("wellCastingInfo_back")
.setMaxRetries(3)
// .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setSerializationSchema(
(input, context) -> new InsertOneModel<>(BsonDocument.parse(JSON.toJSONString(input))))
.build();
Flink 1.12 之前,Sink 算子是通过调用 DataStream 的 addSink 方法来实现的:
stream.addSink(new SinkFunction(...));
从 Flink 1.12 开始,Flink 重构了 Sink 架构:
stream.sinkTo(...)
env.execute() 是用于启动 Flink 作业(Job)的关键方法。这里的 env 通常是一个 StreamExecutionEnvironment 或 ExecutionEnvironment 的实例,它们分别用于 Flink 的 DataStream API 和 DataSet API。
当创建了一个 Flink 作业,定义了数据源、转换(transformations)和数据接收器(sinks)之后,需要调用 env.execute() 来触发 Flink 运行时(runtime)执行作业。
需要注意的是,一旦调用了 env.execute(),Flink 运行时就会开始执行作业,并且 env.execute() 方法会阻塞,直到作业执行完成或发生错误。如果希望程序在启动 Flink 作业后继续执行其他操作,可以考虑将 Flink 作业提交到远程集群并在本地程序中继续执行其他任务。这通常需要使用 Flink 的集群客户端(ClusterClient)或相应的 REST API 来实现。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。