【Flink入门(6)】Flink的状态管理(基础)

【时间】2022.06.08 周三 

【题目】【Flink入门(6)】Flink的状态管理(基础)

本专栏是尚硅谷Flink课程的笔记与思维导图。

目录

引言

一、状态(state)概述

二、算子状态 Operator State

2.1 概述

2.2 数据结构

2.3 代码例子

三、键控状态 Keyed State

3.1 概述

3.2 数据结构

3.3 代码例子

3.4 两种状态的对比

3.5 横向扩展问题(并行度改变)

四、状态后端 State Backends

4.1 概述

4.2 不同的状态后端

4.3 代码示例

总思维导图

参考


引言

本节主要介绍flink中的状态管理(基础),包括:

  • 算子状态 Operator State和 键控状态 Keyed State的概述与使用
  • 设置状态后端 State Backends

一、状态(state)概述

 

二、算子状态 Operator State

2.1 概述

  

2.2 数据结构

2.3 代码例子

代码: 

package apitest.state;

import apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Collections;
import java.util.List;

public class StateTest1_OperatorState {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // socket文本流
    DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

    // 转换成SensorReading类型
    DataStream<SensorReading> dataStream = inputStream.map(line -> {
      String[] fields = line.split(",");
      return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    });

    // 定义一个有状态的map操作,统计当前分区数据个数
    SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new MyCountMapper());

    resultStream.print();

    env.execute();
  }

  // 自定义MapFunction
  public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {
    // 定义一个本地变量,作为算子状态
    private Integer count = 0;

    @Override
    public Integer map(SensorReading value) throws Exception {
      count++;
      return count;
    }

    @Override
    public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {//存储状态
      return Collections.singletonList(count);
    }

    @Override
    public void restoreState(List<Integer> state) throws Exception {//恢复状态
      for (Integer num : state) {
        count += num;
      }
    }
  }
}

 输入(本地开启socket后输入):

sensor_1,1547718199,35.8
sensor_1,1547718199,35.8
sensor_1,1547718199,35.8
sensor_1,1547718199,35.8
sensor_1,1547718199,35.8

输出:

1
2
3
4
5

三、键控状态 Keyed State

3.1 概述

 

3.2 数据结构

3.3 代码例子

假设做一个温度报警,如果一个传感器前后温差超过10度就报警。这里使用键控状态Keyed State + flatMap来实现。

 代码:

package apitest.state;

import apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StateTest3_KeyedStateApplicationCase {

  public static void main(String[] args) throws Exception {
    // 创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并行度 = 1
    env.setParallelism(1);
    // 从socket获取数据
    DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
    // 转换为SensorReading类型
    DataStream<SensorReading> dataStream = inputStream.map(line -> {
      String[] fields = line.split(",");
      return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    });

    SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultStream = dataStream.keyBy(SensorReading::getId).flatMap(new MyFlatMapper(10.0));

    resultStream.print();

    env.execute();
  }

  // 如果 传感器温度 前后差距超过指定温度(这里指定10.0),就报警
  public static class MyFlatMapper extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {

    // 报警的温差阈值
    private final Double threshold;

    // 记录上一次的温度
    ValueState<Double> lastTemperature;

    public MyFlatMapper(Double threshold) {
      this.threshold = threshold;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      // 从运行时上下文中获取keyedState
      lastTemperature = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));
    }

    @Override
    public void close() throws Exception {
      // 手动释放资源
      lastTemperature.clear();
    }

    @Override
    public void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {
      Double lastTemp = lastTemperature.value();
      Double curTemp = value.getTemperature();

      // 如果不为空,判断是否温差超过阈值,超过则报警
      if (lastTemp != null) {
        if (Math.abs(curTemp - lastTemp) >= threshold) {
          out.collect(new Tuple3<>(value.getId(), lastTemp, curTemp));
        }
      }

      // 更新保存的"上一次温度"
      lastTemperature.update(curTemp);
    }
  }
}

3.4 两种状态的对比

3.5 横向扩展问题(并行度改变)

        对于Keyed State和Operator State这两种状态,他们的横向伸缩机制不太相同。

​        由于每个Keyed State总是与某个Key相对应,当横向伸缩时,Key总会被自动分配到某个算子子任务上,因此Keyed State会自动在多个并行子任务之间迁移。

​        对于一个非`KeyedStream`,流入算子子任务的数据可能会随着并行度的改变而改变。如上图所示,假如一个应用的并行度原来为2,那么数据会被分成两份并行地流入两个算子子任务,每个算子子任务有一份自己的状态,当并行度改为3时,数据流被拆成3支,或者并行度改为1,数据流合并为1支,此时状态的存储也相应发生了变化。

四、状态后端 State Backends

4.1 概述

4.2 不同的状态后端

区别在于状态和checkpoint的存储位置。

4.3 代码示例

package apitest.state;

import apitest.beans.SensorReading;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class StateTest4_FaultTolerance {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. 状态后端配置
        env.setStateBackend(new MemoryStateBackend());
        env.setStateBackend(new FsStateBackend("checkpointDataUri"));
        // 这个需要另外导入依赖
        env.setStateBackend(new RocksDBStateBackend("checkpointDataUri"));

        // socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        dataStream.print();
        env.execute();
    }
}

总思维导图

参考

1、Flink状态管理详解:Keyed State和Operator List State深度解析