赞
踩
自定义数据源,通常使用RichParallelSourceFunciton作为父类,实现open、run、cancel三个方法
方法名 | 作用 |
open | 初始化信息 |
run | 一般使用死循环不断输出数据 |
cancel | 销毁资源 |
- package flink.sourcefunction
- import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.api.scala._
- import org.apache.flink.configuration.Configuration
-
- //scala 版
- class RandomSourceFunction extends RichParallelSourceFunction[String]{
- @volatile private var running = true
- //初始化资源
- override def open(parameters: Configuration): Unit = {
- super.open(parameters)
- //初始化资源...
- }
-
- //随机生成数字作为数据源
- override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
- //获取getRuntimeContext
- val number = getRuntimeContext.getNumberOfParallelSubtasks
- val index = getRuntimeContext.getIndexOfThisSubtask
-
- //打印出number和index
- println("这是自定义数据源,number = " + number + " ,index= " + index)
- while(running){
- val random = scala.util.Random.nextInt(100)
- random/(index+1) match {
- //当设置为TimeCharacteristic#EventTime时可使用collectWithTimestamp
- case 0 => ctx.collect(String.valueOf(random))
- case _ =>
- }
- }
- }
- //空实现
- override def cancel(): Unit = {
- //停止输出数据
- running = false
- //销毁资源...
- }
- }

1、getRuntimeContext. getIndexOfThisSubtask 可以得到当前SubTask的index,借此可实现不同的index以不同的方式获取数据或者按某种规则划分数据给相应的index
随机数random模index为0时才会做为源数据emit出去
- random/index match {
- //当设置为TimeCharacteristic#EventTime时可使用collectWithTimestamp
- case 0 => ctx.collect(String.valueOf(random))
- case _ => println(_)
- }
2、线程模型:默认情况下每个SouceFunction实例由其独有线程执行run方法,即不会多个线程并行执行同一个实例run方法的情况,可以理解为单线程执行。
一般情况与CheckpointedFunction同时使用,保证source的高可用。这里需要注意的是checkpoint与emit是不能并行执行的,只能串行化执行。在使用的锁对象的时候一定要使用SourceContext.getCheckpointLock锁对象,这是Flink中的全局锁对象。(备注:1.10以上的版本使用MaixBox后checkpointLock并发模式除了数据源还在用,其它情况下都不使用了)
- //java 版
- public void run(SourceContext<T> ctx) {
- while (isRunning && count < 1000) {
- // this synchronized block ensures that state checkpointing,
- // internal state updates and emission of elements are an atomic operation
- synchronized (ctx.getCheckpointLock()) {
- ctx.collect(count);
- count++;
- }
- }
- }
线程模型
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。