大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Flink DataSet

Flink DataSet 转换操作

Flink DataSet 输出

容错机制、对比、发展方向

Flink Window 背景

Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,去上面实现了流处理和批处理,而Window就是从Streaming到Batch的桥梁。


通俗讲,Window是用来对一个无限的流的设置一个有限的集合,从而有界数据集上进行操作的一种机制,流上的集合由Window来划定范围,比如“计算过去10分钟”或者“最后50个元素的和”。

Window可以由时间(TimeWindow)比如30秒或者数据,(CountWindow)比如100个元素驱动。

DataStreamAPI提供了Time和Count的Window。


Flink Window 总览

基本概念

Window 是Flink处理无限流的核心,Windows将流拆分为有限大小“桶”,我们可以在其上应用计算。

Flink 认为Batch是Streaming的一个特例,所以Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。

而Window窗口是从Streaming到Batch的一个桥梁。

Flink提供了非常完善的窗口机制

在流处理中,数据是连续不断的,因此我们不可能等到所有等到所有数据都到了再开始处理。

当然我们可以每来一个消息就处理一次,但是有时候我们需要做一些聚合操作,例如:在过去一分钟内有多少用户点击了我们的网页

在这种情况下,我们必须定义一个窗口,用来收集最近的一分钟内的数据,并对这个窗口的内数据进行计算

窗口可以基于时间驱动、也可以基于事件驱动

同样基于不同事件驱动的可以分为:翻滚窗口(TumblingWindow 无重叠)、滑动窗口(Sliding Window 有重叠)、会话窗口(SessionWindow 活动间隙)、全局窗口

Flink要操作窗口,先要将StreamSource转换成WindowedStream

转换步骤

获取流数据源

获取窗口

操作窗口数据

输出窗口数据

滚动时间窗口

类型特点

将数据依据固定的窗口长度对数据进行切分:


时间对齐

窗口长度固定,没有重叠

Flink 的滚动时间窗口(Tumbling Window)是一种常见的基于时间的窗口机制,可以通过事件驱动进行计算。滚动窗口的特点是时间窗口是固定长度的,窗口之间没有重叠,每个事件只能进入一个窗口。


在 Flink 中,滚动时间窗口可以基于事件时间(Event Time)或者处理时间(Processing Time)来定义。为了基于事件时间驱动,可以使用 EventTimeSessionWindows 或者 TumblingEventTimeWindows 来进行定义。


关键点

事件时间和水印 (Watermark): 通过 assignTimestampsAndWatermarks 来指定事件时间,并使用水印确保窗口计算不会遗漏延迟的事件。

窗口定义: 使用 TumblingEventTimeWindows.of(Time.seconds(x)) 定义滚动窗口。窗口长度为 x 秒。

触发器: 采用 EventTimeTrigger 触发计算,确保窗口是基于事件时间的。

基于时间驱动

场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)。

启动的主类:

package icu.wzk;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Random;


public class TumblingWindow {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long timeMillis = System.currentTimeMillis();
                        int random = new Random().nextInt(10);
                        System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));
                        return Tuple2.of(value, random);
                    }
                });

        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });

        // 基于时间驱动 每隔 10秒 划分一个窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream
                .timeWindow(Time.seconds(10));
        timeWindow.apply(new MyTimeWindowFunction()).print();
        env.execute("TumblingWindow");

    }

}

我们实现一个 MyTimeWindowFunction,滚动时间窗口:

package icu.wzk;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {

    /**
     * 场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)
     * @author wzk
     * @date 16:58 2024/7/26

    **/
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        int sum = 0;
        for (Tuple2<String, Integer> tuple2 : input) {
            sum += tuple2.f1;
        }
        out.collect("key: " + tuple.getField(0) + ", value: " + sum  +
                ", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd()));
    }
}

基于事件驱动

场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。

编写一个启动类:

package icu.wzk;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Random;


public class TumblingWindow {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long timeMillis = System.currentTimeMillis();
                        int random = new Random().nextInt(10);
                        System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));
                        return Tuple2.of(value, random);
                    }
                });

        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });

        // 基于时间驱动 每隔 10秒 划分一个窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream
                .countWindow(3);
        globalWindow.apply(new MyCountWindowFuntion());
        env.execute("TumblingWindow");

    }

}

编写一个事件驱动的类:MyCountWindowFuntion

package icu.wzk;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;


import java.text.SimpleDateFormat;


public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {

    /**
     * 场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
     * @author wzk
     * @date 17:11 2024/7/26
    **/
    @Override
    public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        int sum = 0;
        for (Tuple2<String, Integer> tuple2 : input) {
            sum += tuple2.f1;
        }
        // 无用的时间戳:默认值是:Long.MAX_VALUE,在事件驱动下,基于计数的情况,不关心时间
        long maxTimestamp = window.maxTimestamp();
        out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :"
                + maxTimestamp + "," + format.format(maxTimestamp));
    }

}


相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
4月前
|
机器学习/深度学习 存储 分布式计算
ODPS驱动电商仓储革命:动态需求预测系统的落地实践
本方案基于ODPS构建“预测-仿真-决策”闭环系统,解决传统仓储中滞销积压与爆款缺货问题。通过动态特征工程、时空融合模型与库存仿真引擎,实现库存周转天数下降42%,缺货率下降65%,年损减少5000万以上,显著提升运营效率与GMV。
351 1
|
2月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
908 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
788 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
6月前
|
机器学习/深度学习 运维 大数据
大数据如何驱动智能制造的升级与蜕变?
大数据如何驱动智能制造的升级与蜕变?
123 12
|
10月前
|
SQL 存储 大数据
Flink 基础详解:大数据处理的强大引擎
Apache Flink 是一个分布式流批一体化的开源平台,专为大规模数据处理设计。它支持实时流处理和批处理,具有高吞吐量、低延迟特性。Flink 提供统一的编程抽象,简化大数据应用开发,并在流处理方面表现卓越,广泛应用于实时监控、金融交易分析等场景。其架构包括 JobManager、TaskManager 和 Client,支持并行度、水位线、时间语义等基础属性。Flink 还提供了丰富的算子、状态管理和容错机制,如检查点和 Savepoint,确保作业的可靠性和一致性。此外,Flink 支持 SQL 查询和 CDC 功能,实现实时数据捕获与同步,广泛应用于数据仓库和实时数据分析领域。
6169 32
zdl
|
12月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
485 56
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
331 1
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
248 1
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
258 0
|
SQL 运维 大数据
大数据实时计算产品的对比测评
在使用多种Flink实时计算产品后,我发现Flink凭借其流批一体的优势,在实时数据处理领域表现出色。它不仅支持复杂的窗口机制与事件时间处理,还具备高效的数据吞吐能力和精准的状态管理,确保数据处理既快又准。此外,Flink提供了多样化的编程接口和运维工具,简化了开发流程,但在界面友好度上还有提升空间。针对企业级应用,Flink展现了高可用性和安全性,不过价格因素可能影响小型企业的采纳决策。未来可进一步优化文档和自动化调优工具,以提升用户体验。
330 0

热门文章

最新文章