Flink 简单的 WordCount 小demo
1.先从数据源获取数据
2.map 给每一个单词打上标记 1 flatmap
3.shuffle 使用keyBy 设置二元组里的key 以便下面reduce 同组处理的时候每个key都在一个组里
4.reduce 将每个元组里的 数值想加起来。
package com.day01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
// 从socket消费数据,然后进行单词计数
public class Example1 {
// 不要忘记抛出异常
public static void main(String[] args) throws Exception {
// 获取流执行环境(上下文)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从socket消费数据
// nc -lk 9999
// 输出String类型
// `.setParallelism(1)`将`socketTextStream`算子的并行子任务的数量设置为1
DataStreamSource<String> source = env.socketTextStream("localhost", 9999).setParallelism(1);
// 从文件获取
// DataStreamSource<String> source = env.readTextFile("/Users/code/flinktutorial0530/MyFlink/src/main/resources/word.txt").setParallelism(1);
// map阶段
// "hello world" => Tuple2.of("hello", 1), Tuple2.of("world", 1)
// 一对多的转换,使用flatMap算子
SingleOutputStreamOperator<Tuple2<String, Integer>> mappedStream = source.flatMap(new Tokenizer()).setParallelism(1);
// shuffle阶段
// 将相同单词的二元组分到一个组里面
// KeyedStream<输入数据的泛型,key的泛型>
// `r -> r.f0`,r是输入数据,将二元组的`f0`字段设置为key
// 每来一条数据,就为这条数据设置一个key
KeyedStream<Tuple2<String, Integer>, String> keyedStream = mappedStream.keyBy(r -> r.f0);
// reduce阶段
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedStream.reduce(new WordCount()).setParallelism(1);
// 输出
result.print().setParallelism(1);
// 提交任务
env.execute();
}
// reduce的输入、输出和累加器的泛型一样
// 第一条数据到来,直接作为累加器保存下来,然后输出累加器的值
// 后面的数据到来,和累加器进行聚合,产生的新累加器覆盖旧的累加器,然后将新的累加器输出
// 每个key都有自己独有的累加器
public static class WordCount implements ReduceFunction<Tuple2<String, Integer>> {
// 输入参数1:累加器
// 输入参数2:输入数据
// 返回值:新的累加器
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> in) throws Exception {
// 聚合的逻辑
return Tuple2.of(accumulator.f0, accumulator.f1 + in.f1);
}
}
// FlatMapFunction<IN, OUT>
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
// 参数1:输入数据
// 参数2:集合,用来收集向下游发送的数据
@Override
public void flatMap(String in, Collector<Tuple2<String, Integer>> out) throws Exception {
// 切分字符串
String[] words = in.split(" ");
for (String word : words) {
// 实例化一个二元组
// `.collect`将要发送的数据收集到集合中,flink会自动向下游发送
out.collect(Tuple2.of(word, 1));
}
}
}
}