当前位置:   article > 正文

Flink之SourceFuction_flink sourcefunction

flink sourcefunction

SourceFunction

自定义数据源,通常使用RichParallelSourceFunciton作为父类,实现open、run、cancel三个方法

方法名

作用

open

初始化信息

run

一般使用死循环不断输出数据

cancel

销毁资源

  1. package flink.sourcefunction
  2. import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
  3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  4. import org.apache.flink.api.scala._
  5. import org.apache.flink.configuration.Configuration
  6. //scala 版
  7. class RandomSourceFunction extends RichParallelSourceFunction[String]{
  8.   @volatile private var running = true
  9.   //初始化资源
  10.   override def open(parameters: Configuration): Unit = {
  11.     super.open(parameters)
  12.     //初始化资源...
  13.   }
  14.   //随机生成数字作为数据源
  15.   override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
  16.     //获取getRuntimeContext
  17.     val number = getRuntimeContext.getNumberOfParallelSubtasks
  18.     val index = getRuntimeContext.getIndexOfThisSubtask
  19.     //打印出number和index
  20.     println("这是自定义数据源,number = " + number + " ,index= " + index)
  21.     while(running){
  22.       val random = scala.util.Random.nextInt(100)
  23.       random/(index+1) match {
  24.           //当设置为TimeCharacteristic#EventTime时可使用collectWithTimestamp
  25.         case 0 => ctx.collect(String.valueOf(random))
  26.         case _ =>
  27.       }
  28.     }
  29.   }
  30.   //空实现
  31.   override def cancel(): Unit = {
  32.     //停止输出数据
  33.     running = false
  34.     //销毁资源...
  35.   }
  36. }

 

1、getRuntimeContext. getIndexOfThisSubtask 可以得到当前SubTask的index,借此可实现不同的index以不同的方式获取数据或者按某种规则划分数据给相应的index

随机数random模index为0时才会做为源数据emit出去

  1. random/index match {
  2.     //当设置为TimeCharacteristic#EventTime时可使用collectWithTimestamp
  3.     case 0 => ctx.collect(String.valueOf(random))
  4.     case _ => println(_)
  5. }

2线程模型:默认情况下每个SouceFunction实例由其独有线程执行run方法,即不会多个线程并行执行同一个实例run方法的情况,可以理解为单线程执行。

与CheckpointedFunction

一般情况与CheckpointedFunction同时使用,保证source的高可用。这里需要注意的是checkpoint与emit是不能并行执行的,只能串行化执行。在使用的锁对象的时候一定要使用SourceContext.getCheckpointLock锁对象,这是Flink中的全局锁对象。(备注:1.10以上的版本使用MaixBox后checkpointLock并发模式除了数据源还在用,其它情况下都不使用了)

  1. //java 版
  2. public void run(SourceContext<T> ctx) {
  3. while (isRunning && count < 1000) {
  4. // this synchronized block ensures that state checkpointing,
  5. // internal state updates and emission of elements are an atomic operation
  6. synchronized (ctx.getCheckpointLock()) {
  7. ctx.collect(count);
  8. count++;
  9. }
  10. }
  11. }

FlinkKafkaConsumer

线程模型

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/794447
推荐阅读
相关标签
  

闽ICP备14008679号