当前位置:   article > 正文

Flink的WordCount实现(Java和Scala)_scala flink datastreamsource

scala flink datastreamsource

Java实现WordCount

package com.flink.Java;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Created by Shi shuai RollerQing on 2019/12/16 15:44
 * 用窗口操作的方式进行WordCount *
 * 需要实现每隔1秒对最近2秒内的数据进行聚合操作
 */
public class WordCount {
    public static void main(String[] args) throws Exception {
        // 获取服务数据的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        } catch (Exception e) {
            System.err.println("No port set . Please use default port 9000");
            port = 6666;
        }

        String hostname = "hadoop01";
        // 初始化对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取数据
        DataStreamSource<String> data = env.socketTextStream(hostname, port);
        // 开始计算
        // 生成一个个元组: (word, 1)
        SingleOutputStreamOperator<WordWithCount> pairWords = data.flatMap(
                new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String s, Collector<WordWithCount> out) throws Exception {
                        String[] splits = s.split(" ");
                        for (String word : splits) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                }
        );

        // 将元组按照key进行分组
        KeyedStream<WordWithCount, Tuple> grouped = pairWords.keyBy("word");

        //调用窗口操作
        //需要给两个重要的参数 窗口长度和滑动间隔
        WindowedStream<WordWithCount, Tuple, TimeWindow> window = grouped.timeWindow(Time.seconds(2), Time.seconds(1));

        SingleOutputStreamOperator<WordWithCount> counts = window.sum("count");

//        window.reduce(new ReduceFunction<WordWithCount>() {
//            @Override
//            public WordWithCount reduce(WordWithCount value1, WordWithCount value2) throws Exception {
//                return new WordWithCount(value1.word, value1.count + value2.count);
//            }
//        });


        //打印
        counts.print().setParallelism(1);

        env.execute("WordCount");
    }

    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97

启动hadoop01,输入命令 nc -lk 6666
idea运行时并没有输入参数 所以根据catch处理port被设置为6666
在这里插入图片描述
在这里插入图片描述

scala

流式WordCount代码实现
package com.flink.demo01
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
 * Created by Shi shuai RollerQing on 2019/12/16 19:03
 * 流式WordCount-Scala代码实现
 *
 * 用窗口操作的方式进行WordCount
 */
object WordCount_Scala {
  def main(args: Array[String]): Unit = {
    // 获取NetCat的port
    val port: Int = try {
      ParameterTool.fromArgs(args).getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("No port set, Use default port 6666")
      }
        6666
    }

    // 获取上下文对象(初始化对象)
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //获取数据
    val data = env.socketTextStream("hadoop01", port)

    // 必须要引入这个包,这里面有很多在计算时用到的一些方法
    import org.apache.flink.api.scala._

    // 进行解析数据,并按照需求进行计算
    val words = data.flatMap(_.split("\\s+")) // 获取数据并进行切分,生成一个个单词
    val tups = words.map(w => WordWithCount(w, 1)) // 将一个个单词生成一个个对偶元组

    val grouped = tups.keyBy("word") // 分组
    //     val grouped = tups.keyBy(0)// 分组

    val window = grouped.timeWindow(Time.seconds(2), Time.seconds(2)) // 调用窗口操作

    //   val res = window.sum("count") // 聚合
    val res = window.reduce((a, b) => WordWithCount(a.word, a.count + b.count))
    // 将结果打印
    res.print.setParallelism(1)
    // 开始执行
    env.execute("scala wordCount")


  }

  case class WordWithCount(word: String, count: Int)

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

在这里插入图片描述
在这里插入图片描述

批式WordCount代码实现
package com.flink.demo01

import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

/**
 * Created by Shi shuai RollerQing on 2019/12/16 19:29
 */
object WordCountB_Scala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    //get input data
    val text: DataSet[String] = env.readTextFile("C:\\Users\\HP\\IdeaProjects\\sparkCore\\data\\test.txt")
    import org.apache.flink.api.scala._
    val counts = text.flatMap(_.toLowerCase.split(" ").filter(_.nonEmpty))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
    counts.collect().foreach(println)
// counts.setParallelism(1).writeAsCsv("C:\\Users\\HP\\IdeaProjects\\sparkCore\\data\\csvwc", "\n", "\t")
//    val write: DataSink[(String, Int)] = counts.setParallelism(1).writeAsCsv("C:\\Users\\HP\\IdeaProjects\\sparkCore\\data\\csvwc", "\n", "\t")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

在这里插入图片描述

为什么运行counts.setParallelism(1).writeAsCsv(“C:\Users\HP\IdeaProjects\sparkCore\data\csvwc”, “\n”, “\t”)这句话没有反应 就是没有对应文件输出???

依赖

<properties>   
        <scala.version>2.11.8</scala.version>
        <flink.version>1.7.2</flink.version>
    </properties>
<!-- java依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
<!-- scala依赖 -->
<!--flink 有界数据处理依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--flink 无界数据处理依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/972403
推荐阅读
相关标签
  

闽ICP备14008679号