赞
踩
在上一篇博客<Flink创建数据源的各种方式–Java和scala实现>中,介绍了Flink的创建数据源的各种方式,本篇就开始介绍大数据处理的第二步,转换。同样,本文均参考自Flink官网,有不懂的可以直接看官网,更权威。https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/batch/dataset_transformations.html
我这里只是作为练习,加深印象。本次练习几点声明如下:
// Scala main函数 def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // mapFunction(env) // filterFunction(env) // mapPartitionFunction(env) // firstFunction(env) // flatMapFunction(env) // distinctFunction(env) // joinFunction(env) // outerJoinFunction(env) crossFunction(env) } // Java main函数 public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // mapFunction(env); // filterFunction(env); // mapPartitionFunction(env); // firstFunction(env); // flatMapFunction(env); // distinctFunction(env); // joinFunction(env); // outerJoinFunction(env); crossFunction(env); }
// map,对集合的每个元素做map中的函数操作
def mapFunction (env: ExecutionEnvironment): Unit = {
val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))
// 对data每个元素做+1
// data.map((x:Int) => x+1).print()
// data.map((x) => x+1).print()
// data.map(x => x+1).print()
data.map(_+1).print() // 从上到下,可以一步步简化
}
// map
public static void mapFunction(ExecutionEnvironment env) throws Exception {
List<Integer> data = new ArrayList<Integer>();
for (int i = 1; i<=10; i++) {
data.add(i);
}
DataSet<Integer> dataSet = env.fromCollection(data);
dataSet.map(new MapFunction<Integer, Integer>() {
public Integer map(Integer value) throws Exception {
return value+1;
}
}).print();
}
// filter,对每个元素做判断,为true的留下,剩下的过滤掉
def filterFunction(env:ExecutionEnvironment): Unit = {
val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))
// 过滤,留下偶数元素
data.map(_+1)
// .filter(_%2==0)
.filter(x => x>=7)
.print()
}
// filter public static void filterFunction(ExecutionEnvironment env) throws Exception { List<Integer> data = new ArrayList<Integer>(); for (int i = 1; i<=10; i++) { data.add(i); } DataSet<Integer> dataSet = env.fromCollection(data); dataSet.map(new MapFunction<Integer, Integer>() { public Integer map(Integer value) throws Exception { return value+1; } }).filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value>5; } }) .print(); System.out.println("filter function : 过滤出大于5的值"); }
// mapPartition,对每个分区做一个函数操作 def mapPartitionFunction (env:ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for (x <- 1 to 100) { students.append("student: " + x) } val data = env.fromCollection(students).setParallelism(5) // TODO 使用map来操作数据库,那么每一条数据都要获取到一个连接,如果有上千万条数据,那么数据库就很容易崩溃 data.map(x => { // TODO 获取数据库连接 val connection = DBUtils.getConnection() println(connection + "....") /* TODO 获取到数据库链接之后,接下来做一些数据库的操作,这里省略 */ // TODO 数据库操作完之后,回收连接 DBUtils.returnConnection(connection) }) // 改用mapPartition data.mapPartition(x => { val connection = DBUtils.getConnection() println(connection) DBUtils.returnConnection(connection) x }).print() }
// mapPartition public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception { List<String> students = new ArrayList<>(); for (int x=0; x<100; x++) { students.add("student : " + x); } DataSet<String> data = env.fromCollection(students).setParallelism(5); data.map(new MapFunction<String, String>() { @Override public String map(String s) throws Exception { String connection = DBUtils.getConnection(); System.out.println("connection: " + connection); DBUtils.returnConnection(connection); return s; } }); // 使用map和mapPartition对比,看看哪个使用数据库连接多 data.mapPartition(new MapPartitionFunction<String, String>() { @Override public void mapPartition(Iterable<String> s, Collector<String> collector) throws Exception { String connection = DBUtils.getConnection(); System.out.println("connection: " + connection); DBUtils.returnConnection(connection); } }).print(); }
// flatMap,按照指定分隔符,对数据切分 def flatMapFunction (env:ExecutionEnvironment): Unit = { val list = new ListBuffer[String]() list.append("Hadoop,Spark") list.append("Spark,Flink") list.append("Hadoop,Flink") val data = env.fromCollection(list) data.flatMap(x => x.split(",")).print() println("~~~~~~~~~~~~~~") // flink worccount,和Spark有点不一样 data.flatMap(_.split(",")) .map(x => (x,1)) .groupBy(0) .sum(1) .print() }
// flatMap public static void flatMapFunction (ExecutionEnvironment env) throws Exception { List<String> info = new ArrayList<>(); info.add("Hadoop,Spark"); info.add("Spark,Flink"); info.add("Hadoop,Flink"); DataSource<String> dataSource = env.fromCollection(info); dataSource.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { String[] splits = s.split(","); for (String split:splits) { collector.collect(split); } } }) .map(new MapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return new Tuple2<String,Integer>(s,1); } }) .groupBy(0) .sum(1) .print(); }
// first,取数据的前多少条 def firstFunction (env:ExecutionEnvironment): Unit = { val info = ListBuffer[(Int,String)]() info.append((1,"Hadoop")) info.append((1,"Spark")) info.append((1,"Flink")) info.append((2,"Scala")) info.append((2,"Java")) info.append((2,"Python")) info.append((3,"Linux")) info.append((3,"Window")) info.append((3,"MacOS")) val data = env.fromCollection(info) // 所有数据,取前3条 // data.first(3).print() // 按第1列聚合,然后取前2条数据 // data.groupBy(0).first(2).print() // 聚合后,第二列降序排序,然后取前2个 data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print() }
// first public static void firstFunction(ExecutionEnvironment env) throws Exception { List<Tuple2<Integer,String>> info = new ArrayList<>(); info.add(new Tuple2(1,"Hadoop")); info.add(new Tuple2(1,"Spark")); info.add(new Tuple2(1,"Flink")); info.add(new Tuple2(2,"Scala")); info.add(new Tuple2(2,"Java")); info.add(new Tuple2(2,"Python")); info.add(new Tuple2(3,"Linux")); info.add(new Tuple2(3,"Window")); info.add(new Tuple2(3,"MacOS")); DataSet<Tuple2<Integer,String>> dataSet = env.fromCollection(info); dataSet.first(3).print(); System.out.println("~~~~~~~~~~~~~~~~~~~"); dataSet.groupBy(0).first(2).print(); System.out.println("~~~~~~~~~~~~~~~~~~~"); dataSet.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print(); }
// distinct,去重
def distinctFunction(env:ExecutionEnvironment): Unit = {
val list = new ListBuffer[String]()
list.append("Hadoop,Spark")
list.append("Spark,Flink")
list.append("Hadoop,Flink")
val data = env.fromCollection(list)
data.flatMap(_.split(","))
.distinct()
.print()
}
// distinct public static void distinctFunction (ExecutionEnvironment env) throws Exception { List<String> info = new ArrayList<>(); info.add("Hadoop,Spark"); info.add("Spark,Flink"); info.add("Hadoop,Flink"); DataSource<String> dataSource = env.fromCollection(info); dataSource.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { String[] splits = s.split(","); for (String split:splits) { collector.collect(split); } } }).distinct().print(); }
// join,根据指定条件(在这里是where和equalTO指定的列位置为key),把两个集合的key相同的结果取出来 def joinFunction (env:ExecutionEnvironment): Unit = { val info1 = ListBuffer[(Int,String)]() info1.append((1,"可达鸭")) info1.append((2,"ylqdh")) info1.append((3,"皮卡丘")) info1.append((4,"鲤鱼王")) val info2 = ListBuffer[(Int,String)]() info2.append((1,"深圳")) info2.append((2,"广州")) info2.append((3,"上海")) info2.append((4,"杭州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.join(data2).where(0).equalTo(0) .apply((x,y) => { (x._1,x._2,y._2) }) .print() }
// join public static void joinFunction(ExecutionEnvironment env) throws Exception { List<Tuple2<Integer,String>> info1 = new ArrayList<>(); info1.add(new Tuple2<>(1,"可达鸭")); info1.add(new Tuple2<>(2,"ylqdh")); info1.add(new Tuple2<>(3,"皮卡丘")); info1.add(new Tuple2<>(4,"鲤鱼王")); List<Tuple2<Integer,String>> info2 = new ArrayList<>(); info2.add(new Tuple2<>(1,"深圳")); info2.add(new Tuple2<>(2,"广州")); info2.add(new Tuple2<>(3,"上海")); info2.add(new Tuple2<>(5,"杭州")); DataSource<Tuple2<Integer,String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer,String>> data2 = env.fromCollection(info2); data1.join(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<Integer,String,String>(first.f0,first.f1,second.f1); } }) .print(); }
// Outjoin,跟sql语句中的left join,right join,full join意思一样 // leftOuterJoin,跟join一样,但是左边集合的没有关联上的结果也会取出来,没关联上的右边为null // rightOuterJoin,跟join一样,但是右边集合的没有关联上的结果也会取出来,没关联上的左边为null // fullOuterJoin,跟join一样,但是两个集合没有关联上的结果也会取出来,没关联上的一边为null def outerJoinFunction (env:ExecutionEnvironment): Unit = { val info1 = ListBuffer[(Int,String)]() info1.append((1,"可达鸭")) info1.append((2,"ylqdh")) info1.append((3,"皮卡丘")) info1.append((4,"鲤鱼王")) val info2 = ListBuffer[(Int,String)]() info2.append((1,"深圳")) info2.append((2,"广州")) info2.append((3,"上海")) info2.append((5,"杭州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) println("~~~~~~~left outer join~~~~~~~~~~") // leftOuterJoin data1.leftOuterJoin(data2).where(0).equalTo(0) .apply((x,y) => { if (y == null) { (x._1,x._2,"--") } else { (x._1,x._2,y._2) } }) .print() println("~~~~~~~~~~~right outer join~~~~~~~~~~~~~ ") // rightOuterJoin data1.rightOuterJoin(data2).where(0).equalTo(0) .apply( (x,y) => { if ( x == null ) { (y._1,"--",y._2) } else { (x._1,x._2,y._2) } }) .print() println("~~~~~~~~~full join~~~~~~~~~~~~") // fullOuterJoin data1.fullOuterJoin(data2).where(0).equalTo(0) .apply( (x,y) => { if ( x == null ) { (y._1,"--",y._2) } else if (y == null) { (x._1,x._2,"--") } else { (x._1,x._2,y._2) } }) .print() }
// outer join public static void outerJoinFunction (ExecutionEnvironment env) throws Exception { List<Tuple2<Integer,String>> info1 = new ArrayList<>(); info1.add(new Tuple2<>(1,"可达鸭")); info1.add(new Tuple2<>(2,"ylqdh")); info1.add(new Tuple2<>(3,"皮卡丘")); info1.add(new Tuple2<>(4,"鲤鱼王")); List<Tuple2<Integer,String>> info2 = new ArrayList<>(); info2.add(new Tuple2<>(1,"深圳")); info2.add(new Tuple2<>(2,"广州")); info2.add(new Tuple2<>(3,"上海")); info2.add(new Tuple2<>(5,"杭州")); DataSource<Tuple2<Integer,String>> data1 = env.fromCollection(info1); DataSource<Tuple2<Integer,String>> data2 = env.fromCollection(info2); System.out.println("~~~~~~~left outer join~~~~~~~~~~~~"); // left outer join data1.leftOuterJoin(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (second == null) { return new Tuple3<>(first.f0,first.f1,"--"); } else { return new Tuple3<Integer,String,String>(first.f0,first.f1,second.f1); } } }) .print(); System.out.println("~~~~~~~~right outer join~~~~~~~~~~~~"); // right outer join data1.rightOuterJoin(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (first == null) { return new Tuple3<>(second.f0,"--",second.f1); } else { return new Tuple3<Integer,String,String>(first.f0,first.f1,second.f1); } } }) .print(); System.out.println("~~~~~~~~~full join~~~~~~~~~~~"); // left outer join data1.fullOuterJoin(data2).where(0).equalTo(0) .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (second == null) { return new Tuple3<>(first.f0,first.f1,"--"); } else if (first == null) { return new Tuple3<>(second.f0,"--",second.f1); } else { return new Tuple3<Integer,String,String>(first.f0,first.f1,second.f1); } } }) .print(); }
// cross,求两个集合的笛卡尔积,得到的结果数为:集合1的条数 乘以 集合2的条数
// 笛卡尔积得到的结果会很大,生产中要慎用。这里模拟两个队伍打比赛,bo3,那么一个队来说,分数可能的结果
def crossFunction (env:ExecutionEnvironment): Unit = {
val info1 = List[String]("team A","team B")
val info2 = List[Int](0,1,2)
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
data1.cross(data2).print()
}
// cross public static void crossFunction (ExecutionEnvironment env) throws Exception { List<String> info1 = new ArrayList<>(); info1.add("team A"); info1.add("team B"); List<Integer> info2 = new ArrayList<>(); info2.add(0); info2.add(1); info2.add(2); DataSource<String> data1 = env.fromCollection(info1); DataSource<Integer> data2 = env.fromCollection(info2); data1.cross(data2).print(); }
关于Java代码中从ExecutionEnvironment对象(我这里这个对象是env)创建数据源的时候,有的用的是DataSource类,有的用的是DataSet类,如
// filter 例子,用的DataSet来创建数据源
DataSet<Integer> dataSet = env.fromCollection(data);
// cross 例子,用的是DataSource来创建数据源
DataSource<String> data1 = env.fromCollection(info1);
DataSource<Integer> data2 = env.fromCollection(info2);
为什么可以用两种类来接收呢?
在官网中有这么一句话:
Data sources create the initial data sets, such as from files or from Java collections.
DataSource是初始的DataSet,所以第一次创建的时候可以用DataSource,也可以用DataSet,但是创建之后的数据源对象要进行转换的时候,就不能再用DataSource了,否则会报错
而Scala会自动推断对象的类型,所以不用对象来接收,所以不用考虑这个问题。
标黄色的Transformation是还没写例子的,后续要补上。经过练习,发现Flink的算子和Spark的大部分是一样的,包括算子名字和用法。少部分很像,可以参考。同时,还发现要实现同样的功能,Java和Scala的代码也很相近,只不过Scala的函数式编程,可以不用像Java那样继承某个类然后实现方法,Scala可以很方便的写,减少了很多代码量。缺点就是代码写多了之后要看Scala的实现,要想想某个缩写是什么意思。哈哈哈哈哈哈,可能这就是没有最好的语言,只有最合适的语言。
后续的计划是先看大数据处理的第三步,输出,即处理后的数据的保存。尽情期待。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。