Flink 简单的 WordCount 小demo

1.先从数据源获取数据
2.map 给每一个单词打上标记 1 flatmap
3.shuffle 使用keyBy 设置二元组里的key 以便下面reduce 同组处理的时候每个key都在一个组里
4.reduce 将每个元组里的 数值想加起来。

package com.day01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


// 从socket消费数据,然后进行单词计数
public class Example1 {
    // 不要忘记抛出异常
    public static void main(String[] args) throws Exception {
        // 获取流执行环境(上下文)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        // 从socket消费数据
        // nc -lk 9999
        // 输出String类型
        // `.setParallelism(1)`将`socketTextStream`算子的并行子任务的数量设置为1
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999).setParallelism(1);
         // 从文件获取
         // DataStreamSource<String> source = env.readTextFile("/Users/code/flinktutorial0530/MyFlink/src/main/resources/word.txt").setParallelism(1);




        // map阶段
        // "hello world" => Tuple2.of("hello", 1), Tuple2.of("world", 1)
        // 一对多的转换,使用flatMap算子
        SingleOutputStreamOperator<Tuple2<String, Integer>> mappedStream = source.flatMap(new Tokenizer()).setParallelism(1);


        // shuffle阶段
        // 将相同单词的二元组分到一个组里面
        // KeyedStream<输入数据的泛型,key的泛型>
        // `r -> r.f0`,r是输入数据,将二元组的`f0`字段设置为key
        // 每来一条数据,就为这条数据设置一个key
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = mappedStream.keyBy(r -> r.f0);


        // reduce阶段
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedStream.reduce(new WordCount()).setParallelism(1);


        // 输出
        result.print().setParallelism(1);


        // 提交任务
        env.execute();
    }


    // reduce的输入、输出和累加器的泛型一样
    // 第一条数据到来,直接作为累加器保存下来,然后输出累加器的值
    // 后面的数据到来,和累加器进行聚合,产生的新累加器覆盖旧的累加器,然后将新的累加器输出
    // 每个key都有自己独有的累加器
    public static class WordCount implements ReduceFunction<Tuple2<String, Integer>> {
        // 输入参数1:累加器
        // 输入参数2:输入数据
        // 返回值:新的累加器
        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> in) throws Exception {
            // 聚合的逻辑
            return Tuple2.of(accumulator.f0, accumulator.f1 + in.f1);
        }
    }


    // FlatMapFunction<IN, OUT>
    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        // 参数1:输入数据
        // 参数2:集合,用来收集向下游发送的数据
        @Override
        public void flatMap(String in, Collector<Tuple2<String, Integer>> out) throws Exception {
            // 切分字符串
            String[] words = in.split(" ");


            for (String word : words) {
                // 实例化一个二元组
                // `.collect`将要发送的数据收集到集合中,flink会自动向下游发送
                out.collect(Tuple2.of(word, 1));
            }
        }
    }
}