赞
踩
接到一个这样的需求:kafka接收实时数据流,需要根据MySQL中的一张表的某个字段进行过滤,然后再写入到MySQL中
Kafka中的数据格式为json格式,跟后端沟通发现MySQL中配置表基本上是不会变化的,但是保险起见还是使用Flinkcdc了,读取MySQL形成配置流在广播出去
首先,准备一些测试数据,读取kafka数据和mysql数据,想着简单一点直接flinksql建表读取
tenv.executeSql("create table AISData(" + " mmsi string," + " heading string," + " rot string," + " course string," + " lon double," + " `time` string," + " create_at string," + " lat double," + " speed string," + " status string," + " proTime as PROCTIME()" + ")with(" + " 'connector' = 'kafka', " + " 'properties.bootstrap.servers' = '192.168.1.102:9092', " + " 'properties.group.id' = 'AISData02', " + " 'topic' = '" + "test-ais" + "', " + " 'scan.startup.mode' = 'earliest-offset', " + " 'format' = 'json' " + ")"); Table t1 = tenv.sqlQuery("select mmsi,\n" + " heading,\n" + " rot,\n" + " course,\n" + " lon,\n" + " `time`,\n" + " create_at,\n" + " lat,\n" + " speed,\n" + " status\n" + "from AISData"); tenv.createTemporaryView("t1",t1); DataStream<AISdata> aiSdataDataStream = tenv.toDataStream(t1, AISdata.class);
//TODO 使用flinkcdc读取mysql中配置信息
MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost")
.port(3306)
.databaseList("aisdata")
.tableList("aisdata.vessel_info_main")
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> mysqlSource = env.fromSource(mySqlSource,
WatermarkStrategy.noWatermarks(), "MysqlSource");
//筛选出配置流中的关键判定字段 SingleOutputStreamOperator<String> map = mysqlSource.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { JSONObject jsonObject = JSON.parseObject(value); Tab after = jsonObject.getObject("after", Tab.class); return after.getMmsi(); } }); MapStateDescriptor<String, ship> stringshipMapStateDescriptor = new MapStateDescriptor<String, ship>("state",String.class, ship.class); //mysql形成广播流 BroadcastStream<String> broadcast = map.broadcast(stringshipMapStateDescriptor); //主流关联广播流 BroadcastConnectedStream<AISdata, String> connect = aiSdataDataStream.connect(broadcast); //业务处理逻辑 SingleOutputStreamOperator<AISdata> singleOutputStreamOperator = connect.process(new MyBroadcastFunctioncdc(stringshipMapStateDescriptor)); singleOutputStreamOperator.print("result");
到这里一切正常,其中关联逻辑非常简单,让广播流进入状态,然后Kafka流根据状态中的特定字段进行判定,有就输出没有就扔掉

因为测试我是用自己本地mysql,版本是5.1.27,当我使用后端的生产环境数据库MySQL8时,发现无论无何这个广播流在刚开始没有加载到状态中,这样会导致一部分数据漏掉。那怎样让广播流优先加载到状态中?经过搜索,发现一篇文章说可以采取主流阻塞的方法,为此我添加一个阻塞的逻辑,主流第一个元素过来阻塞5秒,之后再去处理这第一个元素
@Override public void processElement(AISdata value, ReadOnlyContext ctx, Collector<AISdata> out) throws Exception { String mmsi = value.getMmsi(); ReadOnlyBroadcastState<String, ship> broadcastState = ctx.getBroadcastState(broadcastLoadedState); if (anInt == 1){ Thread.sleep(5000); listState.add(value); } if (anInt != 1){ if (anInt == 2){ for (AISdata aiSdata: listState){ if (broadcastState.contains(aiSdata.getMmsi())){ out.collect(aiSdata); anInt++; } } } if (anInt == 3){ if (broadcastState.contains(mmsi)){ System.out.println("--------"); out.collect(value); } } } if (anInt==1) anInt++; }
但是发现并没有用,百思不得其解,这个广播流为什么进入不了状态呢?最终无奈打电话摇人
我:大佬麻烦帮我看看这个广播流为啥关联不上呢?我还特意阻塞了主流
大佬:你先这样这样在这样………你看你的阻塞是不起作用的,你从哪里学的?
我:csdn
大佬:呵,csdn上面的东西你也信!
我:那为啥在mysql5不加阻塞也可以关联到广播流呢?
大佬:MySQL8加了很多安全校验的内容,初始化速度很慢,而且你那个5的多试几次你会发现有时候mysql流也比Kafka流慢
我:那这个主流阻塞为什么不行呀,看起来好像没毛病啊
大佬:你要搞清楚,你的这个process算子在运行时相当于一个单线程,所以你的操作相当于在单线程里面阻塞当前线程,你说有没有用?
我:Flink不是有并行度吗,一个任务不是可以多个线程一起执行吗?
大佬:唉!Flink的多并行内存是严格隔离的?一个task执行任务其实还是相当于单线程,打个比方多个task在执行你的那个process算子时,每个task都会阻塞。而且要用阻塞也不是在算子里面阻塞啊,要么就在主流初始化的时候阻塞。
我:那意思是在Kafka读取之后用map在阻塞一下就可以达到目的吗?
大佬:这可不一定,你得去算子链里面看下
我:(⊙o⊙)?强
最后,这个困扰我许久的问题终于得到解决。之后的第一感受就是应该转行了,Flink没有入门!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。