Flink SQL中的窗口函数

 

概述

Flink窗口函数是flink的重要特性,而Flink SQL API是Flink批流一体的封装,学习明白本节课,是对Flink学习的很大收益!

窗口函数

窗口函数Flink SQL支持基于无限大窗口的聚合(无需在SQL Query中,显式定义任何窗口)以及对一个特定的窗口的聚合。例如,需要统计在过去的1分钟内有多少用户点击了某个的网页,可以通过定义一个窗口来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。

Flink SQL支持的窗口聚合主要是两种:Window聚合和Over聚合。本文档主要为您介绍Window聚合。Window聚合支持Event Time和Processing Time两种时间属性定义窗口。每种时间属性类型支持三种窗口类型:滚动窗口(TUMBLE)、滑动窗口(HOP)和会话窗口(SESSION)。

时间属性

Flink SQL支持以下两种时间属性。实时计算可以基于这两种时间属性对数据进行窗口聚合。

  • Event Time:您提供的事件时间(通常是数据的最原始的创建时间),Event Time一定是您提供在Schema里的数据。
  • Processing Time:对事件进行处理的本地系统时间。

概念性的东西不说了,参考:https://help.aliyun.com/document_detail/62510.html?spm=a2c4g.11186623.6.761.5778319bX6nU52

https://blog.csdn.net/huahuaxiaoshao/article/details/107520208

2 分组窗口

2.1 分组窗口的类型

      SQL查询的分组窗口是通过GROUP BY子句定义的。类似于使用常规GROUP BY语句的查询,窗口分组语句的GROUP BY子句中带有一个窗口函数为每个分组计算出一个结果。以下是批处理和流出理表支持的分组窗口函数:
(1)TUMBLE(time_attr, interval)
      定义一个滚动窗口。滚动窗口把行分配到有固定持续时间(interval)的不重叠的连续窗口。比如,5分钟的滚动窗口以5分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
(2)HOP(time_attr, interval, interval)
      定义一个跳跃窗口(在Table API中成为滑动窗口)。滑动窗口有一个固定的持续时间(第二个interval参数)以及一个滑动的间隔(第一个interval参数)。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为15分钟的滑动窗口,其滑动间隔为5分钟,将会把每一行数据分配到3个15分钟的窗口中。滑动窗口可以定义咋事件时间(批处理、流处理)或处理时间(流处理)上。
(3)SESSION(time_attr, interval)
      定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据interval所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有新纪录出现,该窗口会被关闭。例如时间窗口的间隔时间是30分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在30分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。

2.2 时间属性

      在流处理表中的SQL查询中,分组窗口函数的time_attr参数必须引用一个合法的时间属性,且该属性需要指定行的处理时间或事件时间。
对于批处理的SQL查询,分组窗口函数的time_attr参数必须是一个TIMESTAMP类型的属性。

2.3 选择分组窗口的开始和结束时间戳(辅助函数)

(1)返回相对应的滚动、滑动和会话窗口的开始时间(包含边界)

TUMBLE_START(time_attr, interval)
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)

(2)返回相对应的滚动、滑动和会话窗口的结束时间(包含边界)

TUMBLE_END(time_attr, interval)
HOP_END(time_attr, interval, interval)
SESSION_END(time_attr, interval)

      注意:返回的间戳不可以在随后基于事件的操作中,作为行时间属性使用,比如基于事件窗口的join以及分组窗口或分组窗口上的聚合。
(3)返回相对应的滚动、滑动和会话窗口的结束时间(不包含边界)

TUMBLE_ROWTIME(time_attr, interval)
HOP_ROWTIME(time_attr, interval, interval)
SESSION_ROWTIME(time_attr, interval)

      返回的是一个可用于后续需要基于时间的操作的时间属性(rowtime attribute),比如基于时间窗口的join以及分组窗口或分组窗口上的聚合。
(4)返回相对应的滚动、滑动和会话窗口的结束时间(不包含边界)

TUMBLE_PROCTIME(time_attr, interval)
HOP_PROCTIME(time_attr, interval, interval)
SESSION_PROCTIME(time_attr, interval)

      返回处理时间参数可用于后续需要基于事件的操作,比如基于时间窗口的join以及分组窗口或分组窗口上的聚合。
      注意:辅助函数必须使用与GROUP BY子句中的分组窗口函数完全相同的参数来调用。

2.4 实战-使用Flink读取kafka数据,以时间时间使用活动窗口统计top5的商品。

package com.atguigu.hotitems_analysis

import java.sql.Timestamp
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.table.api.{EnvironmentSettings, Slide}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.dataformat.BaseRow


object HotItemsWithSQL {
  def main(args: Array[String]): Unit = {
    // 创建一个流处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //val inputStream: DataStream[String] = env.readTextFile("D:\\\\AAA\\\\Flink\\\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
    // 将数据转换成样例类类型,并且提取timestamp定义watermark
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "")
    properties.setProperty("zookeeper.connect", "")
    properties.setProperty("group.id", "kafka2es")

    val inputStream: DataStream[String] = env.addSource( new FlinkKafkaConsumer[String]("student-test", new SimpleStringSchema(), properties) )

    // 这个是单词个数统计
//    val stream2 = inputStream.map(_.split("\\W+")).flatMap(_.toSeq).map((_, 1)).keyBy(0).sum(1)
//    stream2.addSink(tup=>{ //sink动作,出发action
//      println(tup._1+", count->  ",tup._2)
//    })


    val dataStream: DataStream[UserBehavior] = inputStream
      .map( data => {
        val dataArray = data.split(",")
        UserBehavior( dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong )
      } )
      .assignAscendingTimestamps(_.timestamp * 1000L)

    // 要调用Table API,先创建表执行环境
    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val tableEnv = StreamTableEnvironment.create(env, settings)

    // 将DataStream注册成表,提取需要的字段,进行处理,这个表和类严格对应
    tableEnv.createTemporaryView("data_table", dataStream, 'itemId, 'behavior, 'timestamp.rowtime as 'ts)
    tableEnv.createTemporaryView("data_table_1", dataStream, 'itemId, 'behavior, 'timestamp.rowtime as 'ts)

     val  table02 = tableEnv.sqlQuery( "select count(behavior)  cu from  data_table_1" )
//    //table转换成流才能显示

    // 用SQL实现
    val resultTable = tableEnv.sqlQuery(
      """
        |select *
        |from (
        |    select *,
        |      row_number() over (partition by windowEnd order by cnt desc) as row_num
        |    from (
        |      select itemId,
        |          count(itemId) as cnt,
        |          hop_end(ts, interval '60' minute, interval '24' hour) as windowEnd
        |      from data_table
        |      where behavior = 'pv'
        |      group by hop(ts, interval '60' minute, interval '24' hour), itemId
        |    )
        |)
        |where row_num <= 5
      """.stripMargin)
    //使用缩进模式将动态表转化为数据流,它用true或false来标记数据的插入和撤回,返回true代表数据插入,false代表数据的撤回
    // 这个滑动步长是在拿到额?????
   // table02.toRetractStream[(BaseRow)].print()
    /*
    输出
    (false,(-|42000))
    (true,(+|42001))
    (false,(-|42001))
    (true,(+|42002))
    (false,(-|42002))
     */

    resultTable.toRetractStream[(Long, Long, Timestamp, Long)].print("results")
    /*
    输出
    results> (true,(2453685,4,2017-11-26 07:00:00.0,4))
    results> (false,(1871901,4,2017-11-26 07:00:00.0,4))
     */

    /*

    1. 将env的streamgraph转化为jobgraph

    2.  设置任务运行的配置信息configuration

    3.  启动LocalFlinkMiniCluster

    4.  提交jobgraph到Cluster"hot item with sql")

     */

    env.execute("hot item with table api & sql")
  }
}

这里使用到的开窗函数是:

hop_end(ts, interval '60' minute, interval '24' hour) as windowEnd

滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。

滑动窗口有两个参数:slide和size。slide为每次滑动的步长,size为窗口的大小。

  • slide < size,则窗口会重叠,每个元素会被分配到多个窗口。
  • slide = size,则等同于滚动窗口(TUMBLE)。
  • slide > size,则为跳跃窗口,窗口之间不重叠且有间隙。

通常,大部分元素符合多个窗口情景,窗口是重叠的。因此,滑动窗口在计算移动平均数(moving averages)时很实用。例如,计算过去5分钟数据的平均值,每10秒钟更新一次,可以设置slide为10秒,size为5分钟。下图为您展示间隔为30秒,窗口大小为1分钟的滑动窗口。

 

table转数据流打印

1.Flink处理数据把表转换为流的时候,可以使用toAppendStream与toRetractStream,前者适用于数据追加的场景, 后者适用于更新,删除场景

2.FlinkSQL中可以使用我们自定义的函数.Flink UDF自定义函数实现:evaluation方法必须定义为public,命名为eval。evaluation方法的输入参数类型和返回值类型决定着函数的输入参数类型和返回值类型。evaluation方法也可以被重载实现多个eval。同时evaluation方法支持变参数,例如:eval(String... strs)

下边是个例子

今天在Flink 1.7.2版本上跑一个Flink SQL 示例 RetractPvUvSQL,报

 

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 19 to line 1, column 51: Cannot apply 'DATE_FORMAT' to arguments of type 'DATE_FORMAT(<VARCHAR(65536)>, <CHAR(2)>)'. Supported form(s): '(TIMESTAMP, FORMAT)'

从提示看应该是不支持参数为字符串,接下来我们自定义一个UDF函数来支持这种场景。

RetractPvUvSQL 代码

public class RetractPvUvSQL {

    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

        DataStreamSource<PageVisit> input = env.fromElements(
                new PageVisit("2017-09-16 09:00:00", 1001, "/page1"),
                new PageVisit("2017-09-16 09:00:00", 1001, "/page2"),

                new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
                new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
                new PageVisit("2017-09-16 10:30:00", 1005, "/page2"));

        // register the DataStream as table "visit_table"
        tEnv.registerDataStream("visit_table", input, "visitTime, userId, visitPage");
        
        Table table = tEnv.sqlQuery(
                "SELECT " +
                        "visitTime, " +
                        "DATE_FORMAT(max(visitTime), 'HH') as ts, " +
                        "count(userId) as pv, " +
                        "count(distinct userId) as uv " +
                        "FROM visit_table " +
                        "GROUP BY visitTime");
        DataStream<Tuple2<Boolean, Row>> dataStream = tEnv.toRetractStream(table, Row.class);

        if (params.has("output")) {
            String outPath = params.get("output");
            System.out.println("Output path: " + outPath);
            dataStream.writeAsCsv(outPath);
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            dataStream.print();
        }
        env.execute();
    }

    /**
     * Simple POJO containing a website page visitor.
     */
    public static class PageVisit {
        public String visitTime;
        public long userId;
        public String visitPage;

        // public constructor to make it a Flink POJO
        public PageVisit() {
        }

        public PageVisit(String visitTime, long userId, String visitPage) {
            this.visitTime = visitTime;
            this.userId = userId;
            this.visitPage = visitPage;
        }

        @Override
        public String toString() {
            return "PageVisit " + visitTime + " " + userId + " " + visitPage;
        }
    }
}

UDF实现

实现参数为字符串的日期解析

public class DateFormat extends ScalarFunction {

    public String eval(Timestamp t, String format) {
        return new SimpleDateFormat(format).format(t);
    }

    /**
     * 默认日期格式:yyyy-MM-dd HH:mm:ss
     *
     * @param t
     * @param format
     * @return
     */
    public static String eval(String t, String format) {
        try {
            Date originDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(t);
            return new SimpleDateFormat(format).format(originDate);
        } catch (ParseException e) {
            throw new RuntimeException("日期:" + t + "解析为格式" + format + "出错");
        }
    }
}

因为flink 已经内置DATE_FORMAT函数,这里我们改个名字:DATEFORMAT

//register the function
  tEnv.registerFunction("DATEFORMAT", new DateFormat());
    
    
   Table table = tEnv.sqlQuery(
        "SELECT " +
                "visitTime, " +
                "DATEFORMAT(max(visitTime), 'HH') as ts, " +
                "count(userId) as pv, " +
                "count(distinct userId) as uv " +
                "FROM visit_table " +
                "GROUP BY visitTime");

从UDF函数注册源码看,自定义函数在Table API或SQL API 都可以使用。

 /**
    * Registers a [[ScalarFunction]] under a unique name. Replaces already existing
    * user-defined functions under this name.
    */
  def registerFunction(name: String, function: ScalarFunction): Unit = {
    // check if class could be instantiated
    checkForInstantiation(function.getClass)

    // register in Table API

    functionCatalog.registerFunction(name, function.getClass)

    // register in SQL API
    functionCatalog.registerSqlFunction(
      createScalarSqlFunction(name, name, function, typeFactory)
    )
  }

执行的结果:

printing result to stdout. Use --output to specify output path.
6> (true,2017-09-16 10:30:00,10,1,1)
4> (true,2017-09-16 09:00:00,09,1,1)
4> (false,2017-09-16 09:00:00,09,1,1)
6> (false,2017-09-16 10:30:00,10,1,1)
4> (true,2017-09-16 09:00:00,09,2,1)
6> (true,2017-09-16 10:30:00,10,2,1)
6> (false,2017-09-16 10:30:00,10,2,1)
6> (true,2017-09-16 10:30:00,10,3,1)

Process finished with exit code 0

我们看下这个结果是什么意思:

Flink RetractStream 用true或false来标记数据的插入和撤回,返回true代表数据插入,false代表数据的撤回,在网上看到一个图很直观地说明RetractStream 为什么存在?

数据倾斜相关配置

在使用实时计算产品时,如果遇到数据倾斜问题,可以增加以下配置,即可解决,不需要手动进行SQL优化。

  1. # 开启5秒的microbatch

  2. blink.microBatch.allowLatencyMs=5000

  3. blink.miniBatch.allowLatencyMs=5000

  4. blink.miniBatch.size=20000

  5. # Local 优化,默认已经开启

  6. # blink.localAgg.enabled=true

  7. # 开启 Partial 优化,解决count distinct热点

  8. blink.partialAgg.enabled=true

  9. # Incremental 优化,默认已经开启

  10. # blink.incrementalAgg.enabled=true