大数据流试计算引擎Flink篇

奋斗吧
奋斗吧
擅长邻域:未填写

标签: 大数据流试计算引擎Flink篇 代码人生博客 51CTO博客

2023-07-17 18:24:09 216浏览

大数据流试计算引擎Flink篇,大数据流试计算引擎Flink篇


文章目录

  • Flink
  • 1.Flink核心概念简介
  • 1.1.什么是Flink
  • 1.2.无界数据和有界数据简介
  • 1.3.Flink优化利用内存性能
  • 1.4.Flink常见的应用场景
  • 2.Flink案例入门
  • 2.1.创建Maven项目整合Flink
  • 2.2.Tuple数据类型操作
  • 2.3.Flink流式处理案例
  • 2.4.Flink运行流程解析
  • 2.5.Flink可视化控制台
  • 3.Flink部署和整体架构讲解
  • 3.1.Flink运行流程简介
  • 3.2.Flink组件角色介绍
  • 3.3.Flink并行度和优先级概念
  • 4.Flink里的Source Operator实战
  • 4.1.Source Operator速览
  • 4.2.元素集合类型Source
  • 4.3.文件/文件系统Source
  • 4.4.基于Socket的Source
  • 4.5.自定义Source
  • 4.6.并行度调整结合WebUI
  • 5.Flink里的Sink Operator实战
  • 5.1.Sink Operator速览
  • 5.2.自定义Sink连接Mysql
  • 5.3.自定义Sink连接Redis
  • 5.4.Flink整合KafkaConnetor
  • 6.Flink常用算子Transformation
  • 6.1.Map和FlatMap实战
  • 6.2.RichMap和RichFlatMap实战
  • 6.3.KeyBy分组实战
  • 6.4.filter和sum实战
  • 6.5.reduce聚合实战
  • 6.6.maxBy-max-minBy-min实战
  • 7.Flink滑动-滚动时间窗和触发器
  • 7.1.Window窗口介绍和应用
  • 7.2.Window窗口API和使用流程
  • 7.3.Tumbling-Window滚动时间窗
  • 7.4.Sliding-Window滑动时间窗
  • 7.5.Count-Window数量窗口
  • 8.Flink增量聚合和全窗口函数
  • 8.1.AggregateFunction增量聚合函数
  • 8.2.WindowFunction全窗口函数
  • 8.3.processWindowFunction全窗口函数
  • 9.迟到无序数据处理watermark
  • 9.1.Watermark简介和应用
  • 9.2.Watermark案例实战
  • 9.3.二次兜底延迟数据处理
  • 9.4.最后的兜底延迟数据处理
  • 9.5.Flink多层保证措施归纳
  • 10.Flink状态State管理和Checkpoint
  • 10.1.Flink的状态State管理简介
  • 10.2.Flink状态State后端存储讲解
  • 10.3.Flink的状态State管理实战
  • 10.4.Flink的Checkpoint-SavePoint
  • 10.5.Flink的Checkpoint代码配置
  • 11.Flink复杂事件处理CEP
  • 11.1.Flink复杂事件处理CEP介绍
  • 11.2.Flink的复杂事件CEP常见概念
  • 11.3.Flink复杂事件CEP案例实战
  • 12.Flink项目打包插件+部署
  • 12.1.Flink服务端多种部署模式
  • 12.2.Flink本地模式部署Linux服务器

Flink

1.Flink核心概念简介

1.1.什么是Flink

Apache Flink是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。

官网:https://flink.apache.org/zh/flink-architecture.html

有谁再用?

Apache Flink 为全球许多公司和企业的关键业务提供支持。在这个页面上,我们展示了一些著名的 Flink 用户,他们在生产中运行着有意思的用例,并提供了展示更详细信息的链接。

在项目的 wiki 页面中有一个 谁在使用 Flink 的页面,展示了更多的 Flink 用户。请注意,该列表并不全面。我们只添加明确要求列出的用户。

大厂一般做实时数仓建设、实时数据监控、实时反作弊风控、画像系统等。

大数据流试计算引擎Flink篇_java


大数据流试计算引擎Flink篇_大数据_02


大数据流试计算引擎Flink篇_flink_03

1.2.无界数据和有界数据简介

(1)无界流:有定义流的开始,但没有定义流的结束。他们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都达到在处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,一边能够推断结果的完整性

(2)有界流:有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后在进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

大数据流试计算引擎Flink篇_数据_04

Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

1.3.Flink优化利用内存性能

有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。

大数据流试计算引擎Flink篇_java_05

1.4.Flink常见的应用场景

(1)事件驱动型应用

什么是事件驱动型应用?

事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。

事件驱动型应用是在计算存储分离的传统应用基础上进化而来。在传统架构中,应用需要读写远程事务型数据库。

相反,事件驱动型应用是基于状态化流处理来完成。在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。下图描述了传统应用和事件驱动型应用架构的区别。

大数据流试计算引擎Flink篇_flink_06

事件驱动型应用的优势?

事件驱动型应用无须查询远程数据库,本地数据访问使得它具有更高的吞吐和更低的延迟。而由于定期向远程持久化存储的 checkpoint 工作可以异步、增量式完成,因此对于正常事件处理的影响甚微。事件驱动型应用的优势不仅限于本地数据访问。传统分层架构下,通常多个应用会共享同一个数据库,因而任何对数据库自身的更改(例如:由应用更新或服务扩容导致数据布局发生改变)都需要谨慎协调。反观事件驱动型应用,由于只需考虑自身数据,因此在更改数据表示或服务扩容时所需的协调工作将大大减少。

典型的事件驱动型应用实例

(2)数据分析应用

什么是数据分析应用?

数据分析任务需要从原始数据中提取有价值的信息和指标。传统的分析方式通常是利用批查询,或将事件记录下来并基于此有限数据集构建应用来完成。为了得到最新数据的分析结果,必须先将它们加入分析数据集并重新执行查询或运行应用,随后将结果写入存储系统或生成报告。

借助一些先进的流处理引擎,还可以实时地进行数据分析。和传统模式下读取有限数据集不同,流式查询或应用会接入实时事件流,并随着事件消费持续产生和更新结果。这些结果数据可能会写入外部数据库系统或以内部状态的形式维护。仪表展示应用可以相应地从外部数据库读取数据或直接查询应用的内部状态。

大数据流试计算引擎Flink篇_事件驱动_07

流式分析应用的优势?

和批量分析相比,由于流式分析省掉了周期性的数据导入和查询过程,因此从事件中获取指标的延迟更低。不仅如此,批量查询必须处理那些由定期导入和输入有界性导致的人工数据边界,而流式查询则无须考虑该问题。

另一方面,流式分析会简化应用抽象。批量查询的流水线通常由多个独立部件组成,需要周期性地调度提取数据和执行查询。如此复杂的流水线操作起来并不容易,一旦某个组件出错将会影响流水线的后续步骤。而流式分析应用整体运行在 Flink 之类的高端流处理系统之上,涵盖了从数据接入到连续结果计算的所有步骤,因此可以依赖底层引擎提供的故障恢复机制。

Flink 如何支持数据分析类应用?

Flink 为持续流式分析和批量分析都提供了良好的支持。具体而言,它内置了一个符合 ANSI 标准的 SQL 接口,将批、流查询的语义统一起来。无论是在记录事件的静态数据集上还是实时事件流上,相同 SQL 查询都会得到一致的结果。同时 Flink 还支持丰富的用户自定义函数,允许在 SQL 中执行定制化代码。如果还需进一步定制逻辑,可以利用 Flink DataStream API 和 DataSet API 进行更低层次的控制。此外,Flink 的 Gelly 库为基于批量数据集的大规模高性能图分析提供了算法和构建模块支持。

(3)数据管道应用

什么是数据管道?

提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。

数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。因此它支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。例如:数据管道可以用来监控文件系统目录中的新文件,并将其数据写入事件日志;另一个应用可能会将事件流物化到数据库或增量构建和优化查询索引。

大数据流试计算引擎Flink篇_数据_08

数据管道的优势?

和周期性 ETL 作业相比,持续数据管道可以明显降低将数据移动到目的端的延迟。此外,由于它能够持续消费和发送数据,因此用途更广,支持用例更多。

Flink 如何支持数据管道应用?

很多常见的数据转换和增强操作可以利用 Flink 的 SQL 接口(或 Table API)及用户自定义函数解决。如果数据管道有更高级的需求,可以选择更通用的 DataStream API 来实现。Flink 为多种数据存储系统(如:Kafka、Kinesis、Elasticsearch、JDBC数据库系统等)内置了连接器。同时它还提供了文件系统的连续型数据源及数据汇,可用来监控目录变化和以时间分区的方式写入文件。

典型的数据管道应用实例

2.Flink案例入门

2.1.创建Maven项目整合Flink

  • 创建maven项目
<properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <flink.version>1.13.1</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.16</version>
    </dependency>
    <!--flink客户端-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--scala版本-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--flink-java版本-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--streamingጱscala版本-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--streamingጱjava版本-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--日志输出-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
        </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>
    <!--json依赖包-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.44</version>
    </dependency>
</dependencies>
新建log4j.properties,日志文件
### 配置appender名称
log4j.rootLogger = debugFile, errorFile
### debug级别以上的日志到:src/logs/debug.log
log4j.appender.debugFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.debugFile.File = src/logs/flink.log
log4j.appender.debugFile.Append = true
### Threshold属性指定输出等级
log4j.appender.debugFile.Threshold = info
log4j.appender.debugFile.layout = org.apache.log4j.PatternLayout
log4j.appender.debugFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %n%m%n
### error级别以上的日志 src/logs/error.log
log4j.appender.errorFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.errorFile.File = src/logs/error.log
log4j.appender.errorFile.Append = true
log4j.appender.errorFile.Threshold = error
log4j.appender.errorFile.layout = org.apache.log4j.PatternLayout
log4j.appender.errorFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %n%m%n

2.2.Tuple数据类型操作

/**
 * @author lixiang
 */
public class Test {
    private static List<String> list = new ArrayList<>();

    static {
        list.add("SpringBoot,Docker");
        list.add("Netty,SpringCloud");
        list.add("Flink,Linux");
    }

    public static void test1() {
        Tuple3<Integer, String, Integer> tuple3 = Tuple3.of(1, "lixiang", 23);
        System.out.println(tuple3.f0);
        System.out.println(tuple3.f1);
        System.out.println(tuple3.f2);
    }

    public static void test2() {
        List<String> collect = list.stream().map(obj -> obj + "拼接").collect(Collectors.toList());
        System.out.println(collect);
    }

    public static void test3() {
        List<String> collect = list.stream().flatMap(obj -> Arrays.stream(obj.split(","))).collect(Collectors.toList());
        System.out.println(collect);
    }
}
public static void main(String[] args) {
     test1();
     test2();
     test3();
}

大数据流试计算引擎Flink篇_flink_09

2.3.Flink流式处理案例

/**
 * @author lixiang
 */
public class FlinkDemo {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口,存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置并行度
        env.setParallelism(1);

        //相同类型元素的数据流
        DataStream<String> stringDataStream = env.fromElements("Java,SpringBoot,SpringCloud", "Java,Linux,Docker");

        stringDataStream.print("处理前");

        //FlatMapFunction<Strings,String>,key是输入类型,value是Collector响应的收集的类型,看源码的注释也是DataStream<String>里面泛型类型
        DataStream<String> flatMapDataStream = stringDataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] arr = value.split(",");
                for (String s : arr) {
                    out.collect(s);
                }
            }
        });
        flatMapDataStream.print("处理后");

        //DataStream需要execute,可以取个名称
        env.execute("data stream job");
    }
}

大数据流试计算引擎Flink篇_数据_10

可以设置多个线程并行执行

//设置并行度
env.setParallelism(3);

大数据流试计算引擎Flink篇_数据_11

2.4.Flink运行流程解析

(1)Flink和Blink关系

  • 2019年Flink的母公司被阿里全资收购
  • 阿里进行高度定制并取名为Blink (加了很多特性 )
  • 阿里巴巴官方说明:Blink不会单独作为一个开源项目运作,而是Flink的一部分
  • 都在不断演进中,对比其他流式计算框架(老到新)
  • Storm 只支持流处理
  • Spark Streaming (流式处理,其实是micro-batch微批处理,本质还是批处理)
  • Flink 支持流批一体

(2)算子Operator

  • 将一个或多个DataStream转换成新的DataStream,可以将多个转换组合成复杂的数据流拓扑
  • Source和Sink是数据输入和数据输出的特殊算子,重点是transformation类的算子
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

(3)Flink在生产环境中的用法

  • flink可以本地idea执行模拟多线程执行,但不能读取配置文件,适合本地调试
  • 可以提交到远程搭建的flink集群
  • getExecutionEnvironment() 是flink封装好的方式可以自动判断运行模式,更方便开发,
  • 如果程序是独立调用的,此方法返回本地执行环境;
  • 如果从命令行客户端调用程序以提交到集群,则返回此集群的执行环境,是最常用的一种创建执行环境的方式

(4)Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同

  • Local 本地部署,直接启动进程,适合调试使用
  • Standalone Cluster集群部署,flink自带集群模式
  • On Yarn 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率,生产环境

大数据流试计算引擎Flink篇_flink_12

2.5.Flink可视化控制台

(1)增加maven依赖

<!--Flink web ui-->
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-runtime-web_${scala.version}</artifactId>
     <version>${flink.version}</version>
</dependency>

访问方式:ip:8081

(2)代码开发

/**
 * flink UI demo
 * @author lixiang
 */
public class FlinkUI {

    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //设置并行度
        env.setParallelism(1);
        //监听192.168.139.80:8888输送过来的数据流
        DataStreamSource<String> stream = env.socketTextStream("192.168.139.80", 8888);
        //流处理
        SingleOutputStreamOperator<String> streamOperator = stream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        streamOperator.print("处理后");
        //执行任务
        env.execute("data stream job");
    }

}

大数据流试计算引擎Flink篇_数据_13


大数据流试计算引擎Flink篇_大数据_14

访问:127.0.0.1:8081

大数据流试计算引擎Flink篇_事件驱动_15

  • nc命令介绍
  • Linux nc命令用于设置网络路由的
  • nc -lk 8888
  • 开启 监听模式,用于指定nc将处于监听模式, 等待客户端来链接指定的端口
  • win | linux 需要安装
  • win 百度搜索博文参考不同系统安装
  • 下载地址 https://eternallybored.org/misc/netcat/
  • linux 安装
  • yum install -y netcat
  • yum install -y nc

3.Flink部署和整体架构讲解

3.1.Flink运行流程简介

(1)运行流程

  • 用户提交Flink程序到JobClient
  • JobClient的解析、优化提交到JobManager
  • TaskManager运行task,并上报信息给JobManager

3.2.Flink组件角色介绍

大数据流试计算引擎Flink篇_flink_16

(1)Flink是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。

  • 运行时由两种类型的进程组成:
  • 一个JobManager
  • 一个或者多个TaskManager

(2)什么是JobManager

  • 协调Flink应用程序的分布式执行的功能
  • 它决定何时调度下一个task
  • 对完成的task或执行失败做出反应
  • 协调checkpoint、并且协调从失败中恢复等等

(3)什么是TaskManager

  • 负责计算的worker,还有上报内存,任务运行情况给JobManager等
  • 至少有一个TaskManager,也称为worker执行作业流的task,并且缓存和交换数据流
  • 在TaskManager中资源调度的最小单位是task slot

(4)JobManager进程由三个不同的组件组成

  • ResourceManager
  • 负责Flink集群中的资源提供、回收、分配,它管理task slots
  • Dispatcher
  • 提供一个REST接口,用来提交Flink应用程序执行
  • 为每一个提交的作业启动一个新的JobManager
  • 运行Flink WebUI用来提供作业执行信息
  • JobMaster
  • 负责管理单个JobGraph的执行,Flink集群中可以同时运行多个作业,每个作业都有自己的JobManager
  • 至少有一个JobManager,高可用(HA)设置中可能有多个JobManager,其中一个始终是Leader,其他则是standby

(5)TaskManager中task slot的数量表示并发处理task的数量

  • 一个task slot中可以执行多个算子,里面有多个线程
  • 算子opetator
  • source
  • transformation
  • sink
  • 对于分布式执行,Flink将算子的subtasks链接成tasks,每个task由一个线程执行
  • 图中source和map算子组成一个算子链,作为一个task运行在一个线程上
  • 将算子链接成task是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的时增加整体吞吐量

大数据流试计算引擎Flink篇_事件驱动_17


大数据流试计算引擎Flink篇_大数据_18

(6)Task Slots任务槽

  • Task Slot是Flink中的任务执行器,每个Task Slot可以运行多个subtask,每个subtask会以单独的线程来运行
  • 每个worker(TaskManager)是一个JVM进程,可以在单独的线程中执行一个(1个solt)或多个subtask
  • 为了控制一个TaskManager中接受多少个task,就是为了所谓的task slots(至少一个)
  • 每个task slots代表TaskManager中资源的固定子集
  • 注意:
  • 所有Task Slot平均分配TaskManager的内存,TaskSlot没有CPU隔离
  • 当前TaskSlot独占内存空间,作业间互不影响
  • 一个TaskManager进程里面有多少个TaskSlot就意味着多少个并发
  • Task Solt数量建议是cpu的核数,独占内存,共享CPU

大数据流试计算引擎Flink篇_事件驱动_19

  • 5个subtask执行,因此有5个并行线程
  • Task正好封装了一个Operator或者Operator Chain的parallel instance。
  • Sub-Task强调的是同一个Operator或者Operator Chain具有多个并行的Task。
  • 图中source和map算子组成一个算子链,作为一个task运行在一个线程上
  • 算子连接成一个task他减少线程间切换、缓冲的开销,并减少延迟的同事增加整体吞吐量。

大数据流试计算引擎Flink篇_大数据_20

  • Task Slot是Flink中的任务执行器,每个Task Slot可以运行多个Task即subtask,每个subtask会以单独的线程来运行。
  • Flink算子之间可以通过【一对一】模式或者【重新分发】模式传输数据。

3.3.Flink并行度和优先级概念

(1)Flink是分布式流式计算框架

  • 程序在多个节点并行执行,所以就有并行度Parallelism
  • DataStream就像是有向无环图(DAG),每一个数据流(DataStream)以一个或者多个source开始,以一个或者多个sink结束

(2)流程

  • 一个数据流(stream)包含一个或者多个分区,在不同的线程、物理机里并行执行
  • 每一个算子(operator)包含一个或者多个子任务(subtask),子任务在不同的线程、物理机里并行执行
  • 一个算子的子任务subtask的个数就是并行度(parallelism)

(3)并行度的调整配置

  • Flink流程序中不同的算子可能具有不同的并行度,可以在多个地方配置,有不同的优先级
  • Flink并行度配置级别(高到低)
  • 算子
  • map(xxxx).setParallelism(2)
  • 全局env
  • env.setParallelism(2)
  • 客户端cli
  • ./bin/flink run -p 2 xxx.jar
  • Flink配置文件
  • /conf/flink-conf.yaml的parallelism.default默认值
  • 本地IDEA运行,并行度默认为cpu核数

(4)一个很重要的区分TaskSolt和parallelism并行度配置

  • taskslot是静态的概念,是指taskmanager具有的并发执行能力
  • parallelism是动态的概念,是指程序运行时实际使用的并发能力

(5)Flink有3种运行模式

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
  • STRAMING 流处理
  • BATCH 批处理
  • AUTOMATCH 根据cource类型自动选择运行模式,基本就是使用这个

4.Flink里的Source Operator实战

4.1.Source Operator速览

(1)Flink的API层级为流式/批式处理应用程序的开发提供了不同级别的抽象

  • 第一层是最底层的抽象为有状态实时流处理,抽象实现是ProcessFunction,用于底层处理
  • 第二层抽象是Core APIs,许多应用程序不需要用到上述最底层抽象的API,而是使用Core APIs进行开发
  • 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层API中处理的数据类型在每种编程语言中都有对应的类。
  • 第三层抽象是Table API。是以表Table为中心的声明式编程API,Table API使用起来很简洁但是表达能力差
  • 类似数据库中关系模型中的操作,比如select、project、join、group by和aggregate等
  • 允许用户在编写应用程序时将Table API与DataStream/DataSet API混合使用
  • 第四层最顶层抽象是SQL,这层程序表达式上都类似于Table API,但是器程序实现都是SQL查询表达式
  • SQL抽象与Table API抽象之间的关联是非常紧密的

大数据流试计算引擎Flink篇_数据_21

(2)Flink编程模型

大数据流试计算引擎Flink篇_java_22

(3)Source来源

  • 元素集合
  • env.fromElements
  • env.fromColletion
  • env.fromSequence(start,end);
  • 文件/文件系统
  • env.readTextFile(本地文件);
  • env.readTextFile(HDFS文件);
  • 基于Socket
  • env.socketTextStream(“ip”, 8888)
  • 自定义Source,实现接口自定义数据源,rich相关的api更丰富
  • 并行度为1
  • SourceFunction
  • RichSourceFunction
  • 并行度大于1
  • ParallelSourceFunction
  • RichParallelSourceFunction

(4)Connectors与第三方系统进行对接(用于source或者sink都可以)

  • Flink本身提供Connector例如kafka、RabbitMQ、ES等
  • 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败

(5)Apache Bahir连接器

  • 里面也有kafka、RabbitMQ、ES的连接器更多

4.2.元素集合类型Source

元素集合

  • env.fromElements
  • env.fromColletion
  • env.fromSequence(start,end)

代码实战

/**
 * @author lixiang
 */
public class FlinkSourceDemo {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> ds1 = env.fromElements("java,springboot", "kafka,redis", "openstack,k8s,docker");
        ds1.print("ds1");

        DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList("hive", "hadoop", "hbase", "rabbitmq", "java"));
        ds2.print("ds2");

        DataStreamSource<Long> ds3 = env.fromSequence(0, 10);
        ds3.print("ds3");

        //执行任务
        env.execute("data job");
    }
}

大数据流试计算引擎Flink篇_大数据_23

4.3.文件/文件系统Source

文件/文件系统

  • env.readTextFile(本地文件)
  • env.readTextFile(HDFS文件)
/**
 * @author lixiang
 */
public class FlinkSourceDemo2 {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStream<String> textDS = env.readTextFile("E:\\软件资料\\log.txt");
        //DataStream<String> hdfsDS = env.readTextFile("hdfs://lixiang:8010/file/log/words.txt");
        textDS.print("textDS");
        env.execute("text job");
    }
}

大数据流试计算引擎Flink篇_数据_24

4.4.基于Socket的Source

基于Socket

  • env.socketTextStream(“ip”,8888)
/**
 * flink UI demo
 * @author lixiang
 */
public class FlinkUI {

    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //设置并行度
        env.setParallelism(1);
        //监听192.168.139.80:8888输送过来的数据流
        DataStreamSource<String> stream = env.socketTextStream("192.168.139.80", 8888);
        //流处理
        SingleOutputStreamOperator<String> streamOperator = stream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        streamOperator.print("处理后");
        //执行任务
        env.execute("data stream job");
    }

}

大数据流试计算引擎Flink篇_java_25


大数据流试计算引擎Flink篇_大数据_26

4.5.自定义Source

自定义Source,实现接口自定义数据源

  • 并行度为1
  • SourceFunction
  • RichSourceFunction
  • 并行度大于1
  • ParallelSourceFunction
  • RichParallelSourceFunction
  • Rich相关的api更丰富,多了Open和Close方法,用于初始化连接,关闭等

设置实体VideoOrder

/**
 * 订单实体类
 * @author lixiang
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class VideoOrder
{
    private String tradeNo;
    private String title;
    private int money;
    private int userId;
    private Date createTime;
}

设置VideoOrderSource

/**
 * @author lixiang
 */
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder>
{

    private volatile Boolean flag = true;

    private Random random = new Random();

    private static List<String> list = new ArrayList<>();

    static
    {
        list.add("SpringBoot2.x课程");
        list.add("Linux入到到精通");
        list.add("Flink流式技术课程");
        list.add("Kafka流式处理消息平台");
        list.add("微服务SpringCloud教程");
    }

    @Override
    public void run(SourceContext<VideoOrder> sourceContext) throws Exception {
        int x = 0;
        while (flag)
        {
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            sourceContext.collect(new VideoOrder(id, title, money, userId, new Date()));
            x++;
            if (x == 10)
            {
                cancel();
            }
        }
    }

    /**
     * 取消任务
     */
    @Override
    public void cancel()
    {
        flag = false;
    }
}

编写Flink任务

/**
 * @author lixiang
 */
public class FlinkMainSource {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<VideoOrder> source = env.addSource(new VideoOrderSource());

        source.print("接入的数据");

        SingleOutputStreamOperator<Integer> streamOperator = source.flatMap(new FlatMapFunction<VideoOrder, Integer>() {
            @Override
            public void flatMap(VideoOrder value, Collector<Integer> out) throws Exception {
                out.collect(value.getMoney());
            }
        });
        streamOperator.print("处理后");
        //流程启动
        env.execute("custom source job");
    }
}

大数据流试计算引擎Flink篇_数据_27

4.6.并行度调整结合WebUI

(1)开启WebUI

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

(2)设置不同并行度

大数据流试计算引擎Flink篇_flink_28

  • 数据流中最大的并行度,就是算子链中最大算子的数量,比如source 2个并行度,filter 4个,sink 4个,最大就是4

大数据流试计算引擎Flink篇_数据_29

5.Flink里的Sink Operator实战

5.1.Sink Operator速览

  • Flink编程模型

大数据流试计算引擎Flink篇_大数据_30

(1)Sink输出源

  • 预定义
  • print
  • writeAsText(过期)
  • 自定义
  • SinkFunction
  • RichSinkFunction
  • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
  • Flink官方提供的Bundle Connector
  • Kafka、ES等
  • Apache Bahir
  • Kafka、ES、Redis等

5.2.自定义Sink连接Mysql

(1)部署MySQL环境

docker pull mysql:5.7
docker run -itd -p 3306:3306 --name my-mysql -e MYSQL_ROOT_PASSWORD=123456 mysql:5.7

(2)连接MySQL创建表

大数据流试计算引擎Flink篇_flink_31

CREATE TABLE `video_order` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `money` int(11) DEFAULT NULL,
  `title` varchar(32) DEFAULT NULL,
  `trade_no` varchar(64) DEFAULT NULL,
  `create_time` date DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

(3)加入flink-mysql依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.12.0</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.25</version>
</dependency>

(4)编写MySQLSink

/**
 * @author lixiang
 */
public class MysqlSink extends RichSinkFunction<VideoOrderDO> {

    private Connection conn = null;
    private PreparedStatement ps = null;

    @Override
    public void invoke(VideoOrderDO videoOrder, Context context) throws Exception {
        //给ps中的?设置具体值
        ps.setInt(1, videoOrder.getUserId());
        ps.setInt(2, videoOrder.getMoney());
        ps.setString(3, videoOrder.getTitle());
        ps.setString(4, videoOrder.getTradeNo());
        ps.setDate(5, new Date(videoOrder.getCreateTime().getTime()));
        int i = ps.executeUpdate();
        System.out.println("处理数据,插入数据库结果:" + (i > 0));
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("---open---");
        conn = DriverManager.getConnection("jdbc:mysql://192.168.139.20:3306/flink?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "123456");
        String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
        ps = conn.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        if (conn != null) {
            conn.close();
        }
        if (ps != null) {
            ps.close();
        }
        System.out.println("---close---");
    }
}

(5)整合MySQLSink

public class FlinkMainSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        DataStreamSource<VideoOrderDO> source = env.addSource(new VideoOrderSource());
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 尝试重启的次数
                Time.of(10, TimeUnit.SECONDS) // 间隔
        ));
        source.print("接收的数据");

        source.addSink(new MysqlSink());
        //流程启动
        env.execute("custom sink job");
    }
}

(6)运行结果

大数据流试计算引擎Flink篇_事件驱动_32


大数据流试计算引擎Flink篇_数据_33

5.3.自定义Sink连接Redis

(1)部署Redis环境

拉取镜像:docker pull redis
启动容器:docker run -d --name redis -p 6379:6379 redis --requirepass "123456"

(2)Flink怎么操作redis?

  • 方式一:自定义sink
  • 方式二:使用connector

(3)Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法

  • getCommandDescription 选择对应的数据结构和key名称配置
  • getKeyFromData 获取key
  • getValueFromData 获取value

(4)添加redis的connector依赖,使用connector整合redis

<dependency>
     <groupId>org.apache.bahir</groupId>
     <artifactId>flink-connector-redis_2.11</artifactId>
     <version>1.0</version>
</dependency>

(5)自定义RedisSink

/**
 * 定义泛型,就是要返回的类型
 * @author lixiang
 */
public class MyRedisSink implements RedisMapper<Tuple2<String,Integer>> {

    /**
     * 选择对应的数据结构,和key的名称
     * @return
     */
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET,"VIDEO_ORDER_COUNTER");
    }
    
    /**
     * 返回key
     * @param value
     * @return
     */
    @Override
    public String getKeyFromData(Tuple2<String,Integer> value) {
        return value.f0;
    }

    /**
     * 返回value
     * @param value
     * @return
     */
    @Override
    public String getValueFromData(Tuple2<String,Integer> value) {
        return value.f1.toString();
    }
}

(5)编写Flink任务类

/**
 * @author lixiang
 */
public class FlinkRedisDemo {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //自己构建的数据源
        /*DataStream<VideoOrderDO> ds = env.fromElements(new VideoOrderDO(5, 32, "java", "2123143432", new Date()),
                new VideoOrderDO(5, 40, "spring", "2123143432", new Date()),
                new VideoOrderDO(5, 60, "springBoot", "2233143432", new Date()),
                new VideoOrderDO(5, 29, "springBoot", "2125643432", new Date()),
                new VideoOrderDO(5, 67, "docker", "2129843432", new Date()),
                new VideoOrderDO(5, 89, "java", "2120943432", new Date()));*/

        //使用自定义的source
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());

        //map转换。来一个记录一个,方便后续统计
        DataStream<Tuple2<String,Integer>> mapDS = ds.map(new MapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String,Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(), 1);
            }
        });

        KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        DataStream<Tuple2<String, Integer>> sumDS = keyedStream.sum(1);

        //输出统计
        sumDS.print();

        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.139.20").setPassword("123456").setPort(6379).build();

        sumDS.addSink(new RedisSink<>(conf,new MyRedisSink()));

        //DataStream需要调用execute,可以取这个名称
        env.execute("custom redis job");
    }
}
/**
 * 自定义的source
 * @author lixiang
 */
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrderDO>
{

    private volatile Boolean flag = true;

    private Random random = new Random();

    private static List<String> list = new ArrayList<>();

    static
    {
        list.add("SpringBoot2.x");
        list.add("Linux");
        list.add("Flink");
        list.add("Kafka");
        list.add("SpringCloud");
        list.add("SpringBoot");
        list.add("Docker");
        list.add("Netty");
    }

    @Override
    public void run(SourceContext<VideoOrderDO> sourceContext) throws Exception {
        int x = 0;
        while (flag)
        {
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            String uuid = UUID.randomUUID().toString();
            sourceContext.collect(new VideoOrderDO(userId, money, title,uuid, new Date()));
        }
    }

    /**
     * 取消任务
     */
    @Override
    public void cancel()
    {
        flag = false;
    }
}

大数据流试计算引擎Flink篇_flink_34

5.4.Flink整合KafkaConnetor

(1)Kafka环境搭建

  • 拉取镜像,部署容器
#zk部署
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

#kafka部署,换成自己的IP
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.139.20:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.20:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
  • 进入容器内部,创建Topic
#进入容器内部,创建topic
docker exec -it kafka /bin/bash

cd /opt/kafka
bin/kafka-topics.sh --create --zookeeper 192.168.139.20:2181 --replication-factor 1 --partitions 1 --topic test-topic
  • 生产者生产消息,消费者消费消息
#创建生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

#运行一个消费者,注意--from-beginning从开头第一个开始消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

(2)Flink整合Kafka读取消息,发送消息

  • 之前自定义SourceFunction,Flink官方也有提供对接外部系统的,比如读取Kafka
  • flink官方提供的连接器
  • 添加依赖
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-connector-kafka_${scala.version}</artifactId>
     <version>${flink.version}</version>
</dependency>
  • 编写Flink任务类
/**
 * @author lixiang
 */
public class FlinkKafka {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        //kafka地址
        props.setProperty("bootstrap.servers", "192.168.139.20:9092");
        //组名
        props.setProperty("group.id", "video-order-group");
        //字符串序列化和反序列化规则
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //offset重置规则
        props.setProperty("auto.offset.reset", "latest");
        //自动提交
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //有后台线程每隔10s检测一下Kafka的分区变化情况
        props.setProperty("flink.partition-discovery.interval-millis","10000");

        //监听test-topic发送的消息
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic",new SimpleStringSchema(),props);

        consumer.setStartFromGroupOffsets();

        DataStream<String> consumerDS = env.addSource(consumer);

        consumerDS.print("test-topic接收的消息");

        //接到消息后,处理
        DataStream<String> dataStream = consumerDS.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return "新来一个订单课程:"+value;
            }
        });
		//处理后的消息发送到order-topic
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("order-topic",new SimpleStringSchema(),props);
        dataStream.addSink(producer);
        env.execute("kafka job");
    }
}
  • 测试testtopic发送消息,order-topic消费消息
#创建生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

#运行一个消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order-topic --from-beginning

大数据流试计算引擎Flink篇_大数据_35


大数据流试计算引擎Flink篇_java_36


大数据流试计算引擎Flink篇_数据_37

6.Flink常用算子Transformation

6.1.Map和FlatMap实战

(1)java里面的Map操作

  • 一对一转换对象
/**
 * @author lixiang
 * flink map算子demo
 */
public class FlinkMapDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStream<VideoOrderDO> streamSource = env.addSource(new VideoOrderSource());
        streamSource.print("处理前数据");

        DataStream<Tuple2<String,Integer>> dataStream = streamSource.map(new MapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(),value.getMoney());
            }
        });

        dataStream.print("处理后");

        env.execute("map job");
    }
}

大数据流试计算引擎Flink篇_flink_38

(2)java里面的FlatMap操作

  • 一对多转换对象
/**
 * @author lixiang
 * flatMap 算子demo
 */
public class FlinkFlatMapDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStream<String> ds = env.fromElements("java&35,spring&20,springboot&30", "springcloud&21,shiro&39,docker&56,linux&87", "netty&98,kafka&48");
        ds.print("处理前");
        DataStream<Tuple2<String,Integer>> out = ds.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] element = value.split(",");
                for (String s : element) {
                    String[] eles = s.split("&");
                    out.collect(new Tuple2<>(eles[0],Integer.parseInt(eles[1])));
                }
            }
        });

        out.print("处理后");

        env.execute("flatMap job");

    }
}

大数据流试计算引擎Flink篇_数据_39

6.2.RichMap和RichFlatMap实战

  • Rich相关的api多了open、close方法,用于初始化连接
  • RichXXX相关open、close、setRuntimeContext等Api方法会根据并行度进行操作的
  • 比如并行度是4,那就有4次触发对应的open、close方法等,是4个不同的subtask
  • 如:RichMapFunction、RichFlatMapFunction、RichSourceFunction等

(1)RichMap实战

/**
 * @author lixiang
 * flink map算子demo
 */
public class FlinkMapDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStream<VideoOrderDO> streamSource = env.addSource(new VideoOrderSource());
        streamSource.print("处理前数据");

        DataStream<Tuple2<String,Integer>> dataStream = streamSource.map(new RichMapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(),value.getMoney());
            }

            @Override
            public void open(Configuration parameters) throws Exception {
                System.out.println("open方法执行");
            }

            @Override
            public void close() throws Exception {
                System.out.println("close方法执行");
            }
        });

        dataStream.print("处理后");

        env.execute("map job");
    }
}

大数据流试计算引擎Flink篇_java_40

(2)RichFlatMapFunction实战

/**
 * @author lixiang
 * flatMap 算子demo
 */
public class FlinkFlatMapDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStream<String> ds = env.fromElements("java&35,spring&20,springboot&30", "springcloud&21,shiro&39,docker&56,linux&87", "netty&98,kafka&48");
        ds.print("处理前");
        DataStream<Tuple2<String,Integer>> out = ds.flatMap(new RichFlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] element = value.split(",");
                for (String s : element) {
                    String[] eles = s.split("&");
                    out.collect(new Tuple2<>(eles[0],Integer.parseInt(eles[1])));
                }
            }

            @Override
            public void open(Configuration parameters) throws Exception {
                System.out.println("open方法执行");
            }

            @Override
            public void close() throws Exception {
                System.out.println("close方法执行");
            }
        });

        out.print("处理后");

        env.execute("flatMap job");

    }
}

大数据流试计算引擎Flink篇_事件驱动_41

6.3.KeyBy分组实战

  • KeyBy分组是把数据流按照某个字段分区,指定字段相同的数据放在同个组中,在进行组内统计
/**
 * @author lixiang
 * keyBy 算子demo
 */
public class FlinkKeyByDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        DataStream<VideoOrderDO> dataStream = env.addSource(new VideoOrderSource());

        //根据title进行分组
        KeyedStream<VideoOrderDO,String> keyedStream = dataStream.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        });

        //分组后将相同标题的money进行累加
        SingleOutputStreamOperator<VideoOrderDO> sumDS = keyedStream.sum("money");

        //map转换
        DataStream<Tuple2<String, Integer>> outputStreamOperator = sumDS.map(new MapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(),value.getMoney());
            }
        });

        outputStreamOperator.print();

        env.execute("keyBy job");
    }
}

大数据流试计算引擎Flink篇_数据_42

6.4.filter和sum实战

  • filter过滤算子
  • sum求和算子
/**
 * @author lixiang
 * flink filter算子demo
 * 先过滤money大于30的,然后根据标题进行分组,然后求每组money总和,最后map转换
 */
public class FlinkFliterDemo {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        DataStreamSource<VideoOrderDO> ds = env.addSource(new VideoOrderSource());

        DataStream<Tuple2<String,Integer>> out = ds.filter(new FilterFunction<VideoOrderDO>() {
            @Override
            public boolean filter(VideoOrderDO value) throws Exception {
                return value.getMoney()>30;
            }
        }).keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).sum("money").map(new MapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(), value.getMoney());
            }
        });

        out.print();

        env.execute("filter sum job");
    }
}

大数据流试计算引擎Flink篇_事件驱动_43

6.5.reduce聚合实战

  • reduce函数
  • keyBy分组后聚合统计sum和reduce实现一样的效果
  • reduce和sum区别
  • sum(“xxx”)使用的时候,如果是tuple元组则用序号,POJO则用属性名称
  • keyBy分组后聚合统计sum和reduce实现一样的效果
  • sum是简单聚合,reduce是可以自定义聚合,aggregate支持复杂的自定义聚合
/**
 * @author lixiang
 * reduce 算子demo
 */
public class FlinkReduceDemo {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        DataStreamSource<VideoOrderDO> ds = env.addSource(new VideoOrderSource());

        SingleOutputStreamOperator<Tuple2<String,Integer>> reduce = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).reduce(new AggregationFunction<VideoOrderDO>() {
            //value1是历史对象,value2是加入统计的对象,所以value1.f1是历史值,value2.f1是新值,不断累加
            @Override
            public VideoOrderDO reduce(VideoOrderDO value1, VideoOrderDO value2) throws Exception {
                value1.setMoney(value1.getMoney() + value2.getMoney());
                return value1;
            }
        }).map(new MapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String,Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(),value.getMoney());
            }
        });

        reduce.print();

        env.execute("reduce job");
    }
}

大数据流试计算引擎Flink篇_flink_44

6.6.maxBy-max-minBy-min实战

  • 如果是用了keyBy,在后续算子要用maxBy,minBy类型,才可以再分组里面找对应的数据
  • 如果用max、min等,就不确定是哪个key中选了
  • 如果是keyBy的是对象的某个属性,则分组用max/min聚合统计,只有聚合的字段会更新,其他字段还是旧的,导致对象不准确
  • 需要用maxBy/minBy才对让整个对象的属性都是最新的
  • max、min出现的问题
/**
 * @author lixiang
 * maxBy-max-minBy-min的使用
 */
public class FlinkMinMaxDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //env.setParallelism(1);
        DataStream<VideoOrderDO> ds = env.fromElements(new VideoOrderDO(5, 32, "java", "2123143432", new Date()),
                new VideoOrderDO(25, 40, "spring", "2123143432", new Date()),
                new VideoOrderDO(45, 60, "springBoot", "2233143432", new Date()),
                new VideoOrderDO(15, 29, "springBoot", "2125643432", new Date()),
                new VideoOrderDO(54, 67, "java", "2129843432", new Date()),
                new VideoOrderDO(59, 89, "java", "2120943432", new Date()));

        SingleOutputStreamOperator<VideoOrderDO> out = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).max("money");

        out.print();

        env.execute("max job");
    }
}

大数据流试计算引擎Flink篇_事件驱动_45

  • maxBy、minBy就不会出现这种问题
/**
 * @author lixiang
 * maxBy-max-minBy-min的使用
 */
public class FlinkMinMaxDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //env.setParallelism(1);
        DataStream<VideoOrderDO> ds = env.fromElements(new VideoOrderDO(5, 32, "java", "2123143432", new Date()),
                new VideoOrderDO(25, 40, "spring", "2123143432", new Date()),
                new VideoOrderDO(45, 60, "springBoot", "2233143432", new Date()),
                new VideoOrderDO(15, 29, "springBoot", "2125643432", new Date()),
                new VideoOrderDO(54, 67, "java", "2129843432", new Date()),
                new VideoOrderDO(59, 89, "java", "2120943432", new Date()));

        SingleOutputStreamOperator<VideoOrderDO> out = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).maxBy("money");

        out.print();

        env.execute("max job");
    }
}

大数据流试计算引擎Flink篇_java_46

7.Flink滑动-滚动时间窗和触发器

7.1.Window窗口介绍和应用

  • 背景
  • 数据流是一直源源不断产生,业务需要聚合统计使用,比如每10s统计过去5分钟的点击量、成交额等。
  • Windows就可以将无限的数据流拆分成有限大小的桶"buckets",然后程序可以对其窗口内的数据进行计算
  • 窗口认为是bucket桶,一个窗口段就是一个桶,比如8到9点是一个桶,9到10点是一个桶
  • 分类
  • time Window时间窗口,即按照一定时间规则作为窗口统计
  • time-sliding-window时间滑动窗口
  • time-tumbing-window时间滚动窗口
  • session Window会话窗口,即一个会话内的数据进行统计
  • count Window数量窗口,即按照一定的数量作为窗口统计

(1)窗口属性

  • 滑动窗口 Sliding Windows
  • 窗口具有固定大小
  • 窗口数据有重叠
  • 例子:每10s统计一次最近1min内的订单数量

大数据流试计算引擎Flink篇_大数据_47

  • 滚动窗口 Tumbling Windows
  • 窗口具有固定大小
  • 窗口数据不重叠
  • 例子:每10s统计一次最近10s内的订单数量

大数据流试计算引擎Flink篇_flink_48

(2)窗口大小size 和 滑动间隔 slide

  • tumbling-window:滚动窗口: size=slide,如:每隔10s统计最近10s的数据
  • sliding-window:滑动窗口: size>slide,如:每隔5s统计最近10s的数据

7.2.Window窗口API和使用流程

(1)什么情况下才可以使用WindowAPI

  • 有keyBy用window()api
  • 没keyBy用windowAll()api,并行度低

大数据流试计算引擎Flink篇_数据_49

  • 一个窗口内 的是左闭右开
  • countWindow没过期,但timeWindow在1.12过期,统一使用window;

(2)窗口分配器Window Assigners

  • 定义了如何将元素分配给窗口,负责将每条数据分发到正确的window窗口上
  • window()的参数是一个WindowAssigner,flink本身提供了Tumbling、Sliding等Assigner

(3)窗口触发器trigger

  • 用来控制一个窗口是否被触发
  • 每个窗口分配器WindowAssigner都有一个默认的触发器,也支持自定义触发器

(4)窗口window function,对窗口内的数据操作

  • 增量聚合函数
aggregate(agg函数,WindowFunction(){  })
  • 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
  • 常见的增量聚合函数有 reduceFunction、aggregateFunction
  • min、max、sum 都是简单的聚合操作,不需要自定义规则
AggregateFunction<IN, ACC, OUT>
IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
  • 全窗口函数
apply(new processWindowFunction(){})
  • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
  • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗
WindowFunction<IN, OUT, KEY, W extends Window>

如果想处理每个元素更底层的API的时候用

//对数据进行解析 ,process对每个元素进行处理,相当于 map+flatMap+filter
process(new KeyedProcessFunction(){processElement、onTimer})

7.3.Tumbling-Window滚动时间窗

  • 滚动窗口 Tumbling Windows
  • 窗口具有固定大小
  • 窗口数据不重叠
  • 比如指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口
  • 代码实战
/**
 * @author lixiang
 * Tumbling-Window滚动窗口
 */
public class FlinkTumblingDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());

        KeyedStream<VideoOrderDO, String> keyedStream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        });
        SingleOutputStreamOperator<Map<String, Object>> map = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money").map(new MapFunction<VideoOrderDO, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(VideoOrderDO value) throws Exception {
                Map<String, Object> map = new HashMap<>();
                map.put("title", value.getTitle());
                map.put("money", value.getMoney());
                map.put("createDate", TimeUtil.toDate(value.getCreateTime()));
                return map;
            }
        });

        map.print();
        env.execute("Tumbling Window job");

    }
}

大数据流试计算引擎Flink篇_java_50

7.4.Sliding-Window滑动时间窗

  • 滑动窗口 Sliding Windows
  • 窗口具有固定大小
  • 窗口数据有重叠
  • 例子:每5s统计一次最近20s内的订单数量
/**
 * @author lixiang
 * Tumbling-Window滚动窗口
 */
public class FlinkSlidingDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());

        KeyedStream<VideoOrderDO, String> keyedStream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        });
        //每5s去统计过去20s的数据
        SingleOutputStreamOperator<Map<String, Object>> map = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5))).sum("money").map(new MapFunction<VideoOrderDO, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(VideoOrderDO value) throws Exception {
                Map<String, Object> map = new HashMap<>();
                map.put("title", value.getTitle());
                map.put("money", value.getMoney());
                map.put("createDate", TimeUtil.toDate(value.getCreateTime()));
                return map;
            }
        });

        map.print();
        env.execute("Sliding Window job");
    }
}

大数据流试计算引擎Flink篇_java_51

7.5.Count-Window数量窗口

  • 基于数量的滚动窗口, 滑动计数窗口
  • 统计分组后同个key内的数据超过5次则进行统计 countWindow(5)
/**
 * @author lixiang
 * Tumbling-Window滚动窗口
 */
public class FlinkWindow1Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());

        KeyedStream<VideoOrderDO, String> keyedStream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        });
        SingleOutputStreamOperator<Map<String, Object>> map = keyedStream.countWindow(5).sum("money").map(new MapFunction<VideoOrderDO, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(VideoOrderDO value) throws Exception {
                Map<String, Object> map = new HashMap<>();
                map.put("title", value.getTitle());
                map.put("money", value.getMoney());
                map.put("createDate", TimeUtil.toDate(value.getCreateTime()));
                return map;
            }
        });

        map.print();

        env.execute("Count Window job");

    }
}

大数据流试计算引擎Flink篇_事件驱动_52

  • 只要有2个数据到达后就可以往后统计5个数据的值, countWindow(5, 2)
/**
 * @author lixiang
 * Tumbling-Window滚动窗口
 */
public class FlinkWindow1Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());

        KeyedStream<VideoOrderDO, String> keyedStream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        });
        SingleOutputStreamOperator<Map<String, Object>> map = keyedStream.countWindow(5,2).sum("money").map(new MapFunction<VideoOrderDO, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(VideoOrderDO value) throws Exception {
                Map<String, Object> map = new HashMap<>();
                map.put("title", value.getTitle());
                map.put("money", value.getMoney());
                map.put("createDate", TimeUtil.toDate(value.getCreateTime()));
                return map;
            }
        });

        map.print();

        env.execute("Count Window job");

    }
}

大数据流试计算引擎Flink篇_大数据_53

8.Flink增量聚合和全窗口函数

8.1.AggregateFunction增量聚合函数

  • 增量聚合函数
aggregate(agg函数,WindowFunction(){  })
  • 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
  • 常见的增量聚合函数有 reduceFunction、aggregateFunction
  • min、max、sum 都是简单的聚合操作,不需要自定义规则
AggregateFunction<IN, ACC, OUT>
IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
  • 滚动窗口聚合案例
/**
 * @author lixiang
 * Tumbling-Window滚动窗口
 */
public class FlinkWindow1Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());

        WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        stream.aggregate(new AggregateFunction<VideoOrderDO, Map<String, Object>, Map<String, Object>>() {

            //初始化累加器
            @Override
            public Map<String, Object> createAccumulator() {
                return new HashMap<>();
            }

            //聚合方式
            @Override
            public Map<String, Object> add(VideoOrderDO value, Map<String, Object> accumulator) {
                if (accumulator.size() == 0) {
                    accumulator.put("title", value.getTitle());
                    accumulator.put("money", value.getMoney());
                    accumulator.put("num", 1);
                    accumulator.put("createTime", value.getCreateTime());
                } else {
                    accumulator.put("title", value.getTitle());
                    accumulator.put("money", value.getMoney() + Integer.parseInt(accumulator.get("money").toString()));
                    accumulator.put("num", 1 + Integer.parseInt(accumulator.get("num").toString()));
                    accumulator.put("createTime", value.getCreateTime());
                }

                return accumulator;
            }

            //返回结果
            @Override
            public Map<String, Object> getResult(Map<String, Object> accumulator) {
                return accumulator;
            }
            //合并内容
            @Override
            public Map<String, Object> merge(Map<String, Object> a, Map<String, Object> b) {
                return null;
            }
        }).print();
        env.execute("Tumbling Window job");
    }
}

大数据流试计算引擎Flink篇_flink_54

8.2.WindowFunction全窗口函数

  • 全窗口函数
apply(new WindowFunction(){ })
  • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
  • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 
WindowFunction<IN, OUT, KEY, W extends Window>
  • 案例实战
/**
 * @author lixiang
 * apply
 */
public class FlinkWindow2Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());

        WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        stream.apply(new WindowFunction<VideoOrderDO, Map<String,Object>, String, TimeWindow>() {
            @Override
            public void apply(String key, TimeWindow timeWindow, Iterable<VideoOrderDO> iterable, Collector<Map<String, Object>> collector) throws Exception {
                List<VideoOrderDO> list = IterableUtils.toStream(iterable).collect(Collectors.toList());
                long sum = list.stream().collect(Collectors.summarizingInt(VideoOrderDO::getMoney)).getSum();
                Map<String,Object> map = new HashMap<>();
                map.put("sumMoney",sum);
                map.put("title",key);
                collector.collect(map);
            }
        }).print();
        env.execute("apply Window job");
    }
}

大数据流试计算引擎Flink篇_大数据_55

8.3.processWindowFunction全窗口函数

  • 全窗口函数
process(new ProcessWindowFunction(){})
  • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
  • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 
ProcessWindowFunction<IN, OUT, KEY, W extends Window>
  • 案例实战
/**
 * @author lixiang
 * process-Window滚动窗口
 */
public class FlinkWindow3Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());

        WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

        stream.process(new ProcessWindowFunction<VideoOrderDO, Map<String,Object>, String, TimeWindow>() {
            @Override
            public void process(String key, ProcessWindowFunction<VideoOrderDO, Map<String, Object>, String, TimeWindow>.Context context, Iterable<VideoOrderDO> iterable, Collector<Map<String, Object>> collector) throws Exception {
                List<VideoOrderDO> list = IterableUtils.toStream(iterable).collect(Collectors.toList());
                long sum = list.stream().collect(Collectors.summarizingInt(VideoOrderDO::getMoney)).getSum();
                Map<String,Object> map = new HashMap<>();
                map.put("sumMoney",sum);
                map.put("title",key);
                collector.collect(map);
            }
        }).print();

        env.execute("process Window job");
    }
}

大数据流试计算引擎Flink篇_数据_56

窗口函数对比

  • 增量聚合
aggregate(new AggregateFunction(){});
  • 全窗口聚合
apply(new WindowFunction(){})

process(new ProcessWindowFunction(){}) //比WindowFunction功能强大

9.迟到无序数据处理watermark

9.1.Watermark简介和应用

(1)基本概念

  • Window:Window是处理无界流的关键,Windows将流拆分为一个个有限大小的buckets,可以可以在每一个buckets中进行计算
  • start_time,end_time:当Window时时间窗口的时候,每个window都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间
  • event-time: 事件发生时间,是事件发生所在设备的当地时间,比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间
  • Watermarks:可以把他理解为一个水位线,等于evevtTime - delay(比如规定为20分钟),一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发1window计算的。
  • Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间

推迟窗口触发的时间,实现方式:通过当前窗口中最大的eventTime-延迟时间所得到的Watermark与窗口原始触发时间进行对比,当Watermark大于窗口原始触发时间时则触发窗口执行!!!我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。

大数据流试计算引擎Flink篇_java_57

那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。

(2)Watermark水位线介绍

  • 由flink的某个operator操作生成后,就在整个程序中随event数据流转
  • With Periodic Watermarks(周期生成,可以定义一个最大允许乱序的时间,用的很多)
  • With Punctuated Watermarks(标点水位线,根据数据流中某些特殊标记事件来生成,相对少)
  • 衡量数据是否乱序的时间,什么时候不用等早之前的数据
  • 是一个全局时间戳,不是某一个key下的值
  • 是一个特殊字段,单调递增的方式,主要是和数据本身的时间戳做比较
  • 用来确定什么时候不再等待更早的数据了,可以触发窗口进行计算,忍耐是有限度的,给迟到的数据一些机会
  • 注意
  • Watermark 设置太小会影响数据准确性,设置太大会影响数据的实时性,更加会加重Flink作业的负担
  • 需要经过测试,和业务相关联,得出一个较合适的值即可
  • 触发计算后,其他窗口内数据再到达也被丢弃

9.2.Watermark案例实战

  • 概念很抽象,下面用一个案例给解释下watermark的作用。
  • 需求:每10s分组统计不同视频的成交总价,数据有乱序延迟,允许5秒的时间。

(1)时间工具类

public class TimeUtil {

    /**
     * 时间处理
     * @param date
     * @return
     */
    public static String toDate(Date date){
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        ZoneId zoneId = ZoneId.systemDefault();
        return formatter.format(date.toInstant().atZone(zoneId));
    }

    /**
     * 字符串转日期类型
     * @param time
     * @return
     */
    public static Date strToDate(String time){
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        LocalDateTime dateTime = LocalDateTime.parse(time, formatter);
        return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant());
    }

    /**
     * 时间处理
     * @param date
     * @return
     */
    public static String format(long date){
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        ZoneId zoneId = ZoneId.systemDefault();
        return formatter.format(new Date(date).toInstant().atZone(zoneId));
    }

}

(2)Flink入口函数

/**
 * @author lixiang
 */
public class FlinkWaterDemo {
    public static void main(String[] args) throws Exception {

        //初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);

        //监听socket输入
        DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888);

        //一对多转换,将输入的字符串转成Tuple类型
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2])));
            }
        });

        //设置watermark,官方文档直接拿来的,注意修改自己的时间参数
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((element, recordTimestamp) -> {
                    return TimeUtil.strToDate(element.f1).getTime();
                }));

        //根据标题进行分组
        watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
            //滚动窗口,10s一统计,全窗口函数
            @Override
            public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception {
                List<String> eventTimeList = new ArrayList<>();
                int total = 0;
                for (Tuple3<String, String, Integer> order : iterable) {
                    eventTimeList.add(order.f1);
                    total = total + order.f2;
                }
                String outStr = "分组key:"+key+",总价:"+total+",窗口开始时间:"+TimeUtil.format(timeWindow.getStart())+",窗口结束时间:"+TimeUtil.format(timeWindow.getEnd())+",窗口所有事件时间:"+eventTimeList;
                collector.collect(outStr);
            }
        }).print();

        env.execute("watermark job");
    }
}

(3)测试数据,nc -lk 8888监听8888端口,一条一条的输入

[root@flink ~]# nc -lk 8888
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
mysql,2022-11-11 23:12:13,20
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:17,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:20,10
java,2022-11-11 23:12:22,10
java,2022-11-11 23:12:25,10

大数据流试计算引擎Flink篇_事件驱动_58


大数据流试计算引擎Flink篇_大数据_59

9.3.二次兜底延迟数据处理

  • 超过了watermark的等待后,还有延迟数据到达怎么办?
  • 上一个案例我们发现,第七个窗口本是[0-10s)窗口的数据,但是[0-10s)窗口的数据已经被统计了,所以数据丢失了,这就需要allowedLateness 来做二次兜底延迟数据处理。
  • 编码很简单,只需要在开窗函数那设置allowedLateness即可

大数据流试计算引擎Flink篇_java_60

/**
 * @author lixiang
 */
public class FlinkWaterDemo {
    public static void main(String[] args) throws Exception {

        //初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);

        //监听socket输入
        DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888);

        //一对多转换,将输入的字符串转成Tuple类型
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2])));
            }
        });

        //设置watermark,官方文档直接拿来的,注意修改自己的时间参数
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((element, recordTimestamp) -> {
                    return TimeUtil.strToDate(element.f1).getTime();
                }));

        //根据标题进行分组
        watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //1min的容忍时间,即使时间段窗口被统计了,只要数据没有超过1min就可以再次被统计进去
                .allowedLateness(Time.minutes(1))
                .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
            //滚动窗口,10s一统计,全窗口函数
            @Override
            public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception {
                List<String> eventTimeList = new ArrayList<>();
                int total = 0;
                for (Tuple3<String, String, Integer> order : iterable) {
                    eventTimeList.add(order.f1);
                    total = total + order.f2;
                }
                String outStr = "分组key:"+key+",总价:"+total+",窗口开始时间:"+TimeUtil.format(timeWindow.getStart())+",窗口结束时间:"+TimeUtil.format(timeWindow.getEnd())+",窗口所有事件时间:"+eventTimeList;
                collector.collect(outStr);
            }
        }).print();

        env.execute("watermark job");
    }
}
  • 测试

大数据流试计算引擎Flink篇_事件驱动_61

9.4.最后的兜底延迟数据处理

  • watermark先输出,然后配置allowedLateness 再延长时间,然后到了后更新之前的窗口数据。
  • 数据超过了allowedLateness 后,怎么办?这会就用侧输出流 SideOutput,最终的一个兜底。
  • 侧输出流不会在统计到之前的窗口上,类似于独立存储起来,就是说没有被统计的数据会被单独存放在一个容器中,自定义的去进行最终一致性的操作。我们的数据输出会存放到redis或者mysql,侧输出的那一部分也存放在redis或者mysql,这样等统计之后,我们可以手动的讲结果进行整合。
  • 编码实战

大数据流试计算引擎Flink篇_java_62

/**
 * @author lixiang
 */
public class FlinkWaterDemo {
    public static void main(String[] args) throws Exception {

        //初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);

        //监听socket输入
        DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888);

        //一对多转换,将输入的字符串转成Tuple类型
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2])));
            }
        });

        //设置watermark,官方文档直接拿来的,注意修改自己的时间参数
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((element, recordTimestamp) -> {
                    return TimeUtil.strToDate(element.f1).getTime();
                }));

        //new 一个OutputTag Bean
        OutputTag<Tuple3<String,String,Integer>> lateData = new OutputTag<Tuple3<String,String,Integer>>("lateData"){};

        //根据标题进行分组
        SingleOutputStreamOperator<String> operator = watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //1min的容忍时间,即使时间段窗口被统计了,只要数据没有超过1min就可以再次被统计进去
                .allowedLateness(Time.minutes(1))
                //侧输入,最后的兜底数据
                .sideOutputLateData(lateData)
                .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
                    //滚动窗口,10s一统计,全窗口函数
                    @Override
                    public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception {
                        List<String> eventTimeList = new ArrayList<>();
                        int total = 0;
                        for (Tuple3<String, String, Integer> order : iterable) {
                            eventTimeList.add(order.f1);
                            total = total + order.f2;
                        }
                        String outStr = "分组key:" + key + ",总价:" + total + ",窗口开始时间:" + TimeUtil.format(timeWindow.getStart()) + ",窗口结束时间:" + TimeUtil.format(timeWindow.getEnd()) + ",窗口所有事件时间:" + eventTimeList;
                        collector.collect(outStr);
                    }
                });

        operator.print();

        //侧输出流数据
        operator.getSideOutput(lateData).print();

        env.execute("watermark job");
    }
}
  • 测试
[root@flink ~]# nc -lk 8888
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
mysql,2022-11-11 23:12:13,20
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:17,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:20,10
java,2022-11-11 23:14:22,10 
java,2022-11-11 23:12:25,10 #设置一个超过1分钟的数据,测试

大数据流试计算引擎Flink篇_大数据_63

9.5.Flink多层保证措施归纳

(1)如何保证在需要的窗口内获取指定的数据?数据有乱序延迟

  • flink采用watermark、allowedLateness()、sideOutputLateData()三个机制来保证获取数据。
  • watermark的作用是防止出现延迟乱序,允许等待一会在触发窗口计算。
  • allowLateness,是将窗口关闭时间在延迟一段时间,允许有一个最大迟到时间,allowLateness的数据会重新触发窗口计算
  • sideOutPut是最后的兜底操作,超过allowLateness后,窗口已经彻底关闭,就会把数据放到侧输出流,侧输出流OutputTag tag = new OutputTag(){},由于泛型擦除的问题,需要重写方法,加花括号。

(2)应用场景:实时监控平台

  • 可以用watermark及时输出数据
  • allowLateness 做短期的更新迟到数据
  • sideOutPut做兜底更新保证数据准确性

(3)总结Flink的机制

  • 第一层 窗口window 的作用是从DataStream数据流里指定范围获取数据。
  • 第二层 watermark的作用是防止数据出现乱序延迟,允许窗口等待延迟数据达到,再触发计算
  • 第三层 allowLateness 会让窗口关闭时间再延迟一段时间, 如果还有数据达到,会局部修复数据并主动更新窗口的数据输出
  • 第四层 sideOutPut侧输出流是最后兜底操作,在窗口已经彻底关闭后,所有过期延迟数据放到侧输出流,可以单独获取,存储到某个地方再批量更新之前的聚合的数据
  • 注意
  • Flink 默认的处理方式直接丢弃迟到的数据
  • sideOutPut还可以进行分流功能
  • DataStream没有getSideOutput方法,SingleOutputStreamOperator才有

(4)版本弃用API

新接口,`WatermarkStrategy`,`TimestampAssigner` 和 `WatermarkGenerator` 因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式

新接口之前是用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks ,现在可以弃用了

10.Flink状态State管理和Checkpoint

10.1.Flink的状态State管理简介

(1)什么是State状态

  • 数据流处理离不开状态管理,比如窗口聚合统计、去重、排序等
  • 是一个Operator的运行的状态\历史值,是维护在内存中
  • 流程:一个算子的子任务接收输入流,获取对应的状态,计算新的结果然后把结果更新到状态里面

(2)有状态和无状态介绍

  • 无状态计算: 同个数据进到算子里面多少次,都是一样的输出,比如 filter
  • 有状态计算:需要考虑历史状态,同个输入会有不同的输出,比如sum、reduce聚合操作

(3)状态管理分类

  • ManagedState
  • Flink管理,自动存储恢复
  • 细分两类
  • Keyed State 键控状态(用的多)
  • 有KeyBy才用这个,仅限用在KeyStream中,每个key都有state ,是基于KeyedStream上的状态
  • 一般是用richFlatFunction,或者其他richfunction里面,在open()声明周期里面进行初始化
  • ValueState、ListState、MapState等数据结构
  • Operator State 算子状态(用的少,部分source会用)
  • ListState、UnionListState、BroadcastState等数据结构
  • RawState
  • 用户自己管理和维护
  • 存储结构:二进制数组
  • State数据结构(状态值可能存在内存、磁盘、DB或者其他分布式存储中)
  • ValueState 简单的存储一个值(ThreadLocal / String)
  • ValueState.value()
  • ValueState.update(T value)
  • ListState 列表
  • ListState.add(T value)
  • ListState.get() //得到一个Iterator
  • MapState 映射类型
  • MapState.get(key)
  • MapState.put(key, value)

10.2.Flink状态State后端存储讲解

从Flink 1.13开始,社区重新设计了其公共状态后端类,以帮助用户更好地理解本地状态存储和检查点存储的分离 用户可以迁移现有应用程序以使用新 API,⽽不会丢失任何状态或⼀致性。

(1)Flink内置了以下这些开箱即用的state backends :

  • (新版)HashMapStateBackend、EmbeddedRocksDBStateBackend
  • 如果没有其他配置,系统将使用 HashMapStateBackend。
  • (旧版)MemoryStateBackend、FsStateBackend、RocksDBStateBackend
  • 如果不设置,默认使用 MemoryStateBackend

(2)State状态详解

  • HashMapStateBackend 保存数据在内部作为Java堆的对象。
  • 键/值状态和窗口操作符持有哈希表,用于存储值、触发器等
  • 非常快,因为每个状态访问和更新都对 Java 堆上的对象进行操作
  • 但是状态大小受集群内可用内存的限制
  • 场景:
  • 具有大状态、长窗口、大键/值状态的作业。
  • 所有高可用性设置。
  • EmbeddedRocksDBStateBackend 在RocksDB数据库中保存状态数据
  • 该数据库(默认)存储在 TaskManager 本地数据目录中
  • 与HashMapStateBackend在java存储 对象不同,数据存储为序列化的字节数组
  • RocksDB可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的状态后端。
  • 但是每个状态访问和更新都需要(反)序列化并可能从磁盘读取,这导致平均性能比内存状态后端慢一个数量级
  • 场景
  • 具有非常大状态、长窗口、大键/值状态的作业。
  • 所有高可用性设置
  • 旧版的状态管理
MemoryStateBackend(内存,不推荐在生产场景使用)
FsStateBackend(文件系统上,本地文件系统、HDFS, 性能更好,常用)
RocksDBStateBackend (无需担心 OOM 风险,是大部分时候的选择)

(3)配置方式

方式一:可以flink-conf.yaml使用配置键在中配置默认状态后端state.backend。

配置条目的可能值是hashmap (HashMapStateBackend)、rocksdb (EmbeddedRocksDBStateBackend) 
或实现状态后端工厂StateBackendFactory的类的完全限定类名

#全局配置例子一
# The backend that will be used to store operator state checkpoints
state.backend: hashmap

# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager

#全局配置例子二
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/

# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

方式二:代码 单独job配置例子

//代码配置一
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

//代码配置二
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
//或者
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
  • 备注:使用 RocksDBStateBackend 需要加依赖
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
            <version>1.13.1</version>
</dependency>

10.3.Flink的状态State管理实战

  • sum()、maxBy() 等函数底层源码也是有ValueState进行状态存储
  • 需求:
  • 根据订单进行分组,统计找出每个商品最大的订单成交额
  • 不用maxBy实现,用ValueState实现
  • 编码实战
/**
 * 使用valueState实现maxBy功能,统计分组内订单金额最高的订单
 * @author lixiang
 */
public class FlinkStateDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStream<String> ds = env.socketTextStream("192.168.139.20", 8888);

        DataStream<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new RichFlatMapFunction<String, Tuple3<String, String, Integer>>() {

            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] arr = value.split(",");
                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
            }
        });

        //一定要key by后才可以使用键控状态ValueState
        SingleOutputStreamOperator<Tuple2<String, Integer>> maxVideoOrder = flatMapDS.keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        }).map(new RichMapFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {
            private ValueState<Integer> valueState = null;
            @Override
            public void open(Configuration parameters) throws Exception {
                valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("total", Integer.class));
            }
            @Override
            public Tuple2<String, Integer> map(Tuple3<String, String, Integer> tuple3) throws Exception {
                // 取出State中的最大值
                Integer stateMaxValue = valueState.value();

                Integer currentValue = tuple3.f2;

                if (stateMaxValue == null || currentValue > stateMaxValue) {
                    //更新状态,把当前的作为新的最大值存到状态中
                    valueState.update(currentValue);
                    return Tuple2.of(tuple3.f0, currentValue);
                } else {
                    //历史值更大
                    return Tuple2.of(tuple3.f0, stateMaxValue);
                }
            }
        });
        maxVideoOrder.print();
        env.execute("valueState job");
    }
}
  • 测试
[root@flink ~]# nc -lk 8888
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,30
mysql,2022-11-11 23:12:13,20
java,2022-11-11 23:12:13,10

大数据流试计算引擎Flink篇_flink_64

10.4.Flink的Checkpoint-SavePoint

  • 什么是Checkpoint 检查点
  • Flink中所有的Operator的当前State的全局快照
  • 默认情况下 checkpoint 是禁用的
  • Checkpoint是把State数据定时持久化存储,防止丢失
  • 手工调用checkpoint,叫 savepoint,主要是用于flink集群维护升级等
  • 底层使用了Chandy-Lamport 分布式快照算法,保证数据在分布式环境下的一致性
  • 开箱即用,Flink 捆绑了这些检查点存储类型:
  • 作业管理器检查点存储 JobManagerCheckpointStorage
  • 文件系统检查点存储 FileSystemCheckpointStorage
  • 配置
//全局配置checkpoints
state.checkpoints.dir: hdfs:///checkpoints/

//作业单独配置checkpoints
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
//全局配置savepoint
state.savepoints.dir: hdfs:///flink/savepoints
  • Savepoint 与 Checkpoint 的不同之处
  • 类似于传统数据库中的备份与恢复日志之间的差异
  • Checkpoint 的主要目的是为意外失败的作业提供【重启恢复机制】,
  • Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互
  • Savepoint 由用户创建,拥有和删除, 主要是【升级 Flink 版本】,调整用户逻辑
  • 除去概念上的差异,Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式
  • 端到端(end-to-end)状态一致性
数据一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的
在真实应用中,了流处理器以外还包含了数据源(例如Kafka、Mysql)和输出到持久化系统(Kafka、Mysql、Hbase、CK)
端到端的一致性保证,是意味着结果的正确性贯穿了整个流处理应用的各个环节,每一个组件都要保证自己的一致性。
  • Source
  • 需要外部数据源可以重置读取位置,当发生故障的时候重置偏移量到故障之前的位置
  • 内部
  • 依赖Checkpoints机制,在发生故障的时可以恢复各个环节的数据
  • Sink:
  • 当故障恢复时,数据不会重复写入外部系统,常见的就是 幂等和事务写入(和checkpoint配合)

10.5.Flink的Checkpoint代码配置

//两个检查点之间间隔时间,默认是0,单位毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//Checkpoint过程中出现错误,是否让整体任务都失败,默认值为0,表示不容忍任何Checkpoint失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);

//Checkpoint是进行失败恢复,当一个 Flink 应用程序失败终止、人为取消等时,它的 Checkpoint 就会被清除
//可以配置不同策略进行操作
// DELETE_ON_CANCELLATION: 当作业取消时,Checkpoint 状态信息会被删除,因此取消任务后,不能从 Checkpoint 位置进行恢复任务
// RETAIN_ON_CANCELLATION(多): 当作业手动取消时,将会保留作业的 Checkpoint 状态信息,要手动清除该作业的 Checkpoint 状态信息
       env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

//Flink 默认提供 Extractly-Once 保证 State 的一致性,还提供了 Extractly-Once,At-Least-Once 两种模式,
// 设置checkpoint的模式为EXACTLY_ONCE,也是默认的,
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

//设置checkpoint的超时时间, 如果规定时间没完成则放弃,默认是10分钟
env.getCheckpointConfig().setCheckpointTimeout(60000);

//设置同一时刻有多少个checkpoint可以同时执行,默认为1就行,以避免占用太多正常数据处理资源
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

//设置了重启策略, 作业在失败后能自动恢复,失败后最多重启3次,每次重启间隔10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));

11.Flink复杂事件处理CEP

11.1.Flink复杂事件处理CEP介绍

(1)什么是FlinkCEP

  • CEP全称 Complex event processing 复杂事件处理
  • FlinkCEP 是在 Flink 之上实现的复杂事件处理(CEP)库
  • 擅长高吞吐、低延迟的处理,市场上有多种CEP的解决方案,例如Spark,但是Flink专门类库更方便使用

(2)FlinkCEP用途

  • 检测和发现无边界事件流中多个记录的关联规则,得到满足规则的复杂事件
  • 允许业务定义要从输入流中提取的复杂模式序列

(3)FlinkCEP使用流程

  • 定义pattern
  • pattern应用到数据流,得到模式流
  • 从模式流 获取结果

(4)CEP并不包含在flink中,使用前需要自己导入

<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-cep-scala_2.11</artifactId>
     <version>1.7.0</version>
</dependency>

11.2.Flink的复杂事件CEP常见概念

(1)模式(Pattern):定义处理事件的规则

  • 三种模式PatternAPI
  • 个体模式(Individual Patterns):组成复杂规则的每一个单独的模式定义,就是个体模式
  • 组合模式(Combining Patterns):很多个体模式组合起来,形成组合模式
  • 模式组(Groups of Patterns):将一个组合模式作为条件嵌套在个体模式里,就是模式组
  • 近邻模式
  • 严格近邻:期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件, API是.next()
  • 宽松近邻:允许中间出现不匹配的事件,API是.followedBy()
  • 非确定性宽松近邻:可以忽略已经匹配的条件,API是followedByAny()
  • 指定时间约束:指定模式在多长时间内匹配有效,API是within
  • 如果您不希望事件类型直接跟随另一个,notNext()
  • 如果您不希望事件类型介于其他两种事件类型之间,notFollowedBy()
  • 模式分类
  • 单次模式:接收一次一个事件
  • 循环模式:接收一个或多个事件

(2)其他参数

  • times:指定固定的循环执行次数
  • greedy:贪婪模式,尽可能多触发
  • oneOrMore:指定触发一次或多次
  • timesOrMore:指定触发固定以上的次数
  • optional:要么不触发要么触发指定的次数

11.3.Flink复杂事件CEP案例实战

  • 需求:同个账号,在5秒内连续登录失败2次,则认为存在而已登录问题
  • 数据格式 李祥,2022-11-11 12:01:01,-1
/**
 * cep-demo
 * @author lixiang
 */
public class FlinkCEPDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> ds = env.socketTextStream("192.168.139.20",8888);
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] arr = value.split(",");
                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
            }
        });
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forMonotonousTimestamps()
                .withTimestampAssigner((event, timestamp) -> TimeUtil.strToDate(event.f1).getTime()));

        KeyedStream<Tuple3<String, String, Integer>, String> keyedStream = watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //定义模式
        Pattern<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>> pattern = Pattern.<Tuple3<String, String, Integer>>
                        begin("firstTimeLogin")
                .where(new SimpleCondition<Tuple3<String, String, Integer>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
                        return value.f2 == -1;
                    }
                })
                .next("secondTimeLogin")
                .where(new SimpleCondition<Tuple3<String, String, Integer>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
                        return value.f2 == -1;
                    }
                }).within(Time.seconds(5));

        //匹配检查
        PatternStream<Tuple3<String, String, Integer>> patternStream = CEP.pattern(keyedStream, pattern);
        SingleOutputStreamOperator<Tuple3<String, String, String>> select = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Integer>, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> select(Map<String, List<Tuple3<String, String, Integer>>> map) throws Exception {
                Tuple3<String, String, Integer> firstLoginFail = map.get("firstTimeLogin").get(0);
                Tuple3<String, String, Integer> secondLoginFail = map.get("secondTimeLogin").get(0);
                return Tuple3.of(firstLoginFail.f0, firstLoginFail.f1, secondLoginFail.f1);
            }
        });

        select.print("匹配结果");

        env.execute("CEP job");
    }
}
  • 测试
张三,2022-11-11 12:01:01,-1
李四,2022-11-11 12:01:10,-1
李四,2022-11-11 12:01:11,-1
张三,2022-11-11 12:01:13,-1
李四,2022-11-11 12:01:14,-1
李四,2022-11-11 12:01:15,1
张三,2022-11-11 12:01:16,-1
李四,2022-11-11 12:01:17,-1
张三,2022-11-11 12:01:20,1

12.Flink项目打包插件+部署

12.1.Flink服务端多种部署模式

Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同

  • 文档:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/deployment/overview/
  • Local 本地部署,直接启动进程,适合调试使用
  • 直接部署启动服务
  • Standalone Cluster集群部署,flink自带集群模式
  • Hadoop YARN 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率
  • Kubernetes 部署
  • Docker部署

大数据流试计算引擎Flink篇_java_65

12.2.Flink本地模式部署Linux服务器

(1)安装JDK8环境

(1)上传jdk1.8安装包,解压到指目录
tar -xvf jdk-8u181-linux-x64.tar.gz -C /usr/local/

(2)查看解压后的文件
[root@flink ~]# cd /usr/local
[root@flink local]# ls
bin  etc  flink-1.13.1  games  include  jdk1.8.0_181  lib  lib64  libexec  sbin  share  src

(3)jdk1.8.0_181重命名为jdk1.8
mv jdk1.8.0_181/ jdk1.8

(4)配置环境变量
vi /etc/profile
添加配置:
JAVA_HOME=/usr/local/jdk1.8
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH

(5)刷新配置
source /etc/profile

(6)查看java环境
[root@flink ~]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
  • 注意:如果不安装jdk环境的话启动flink会报这个错误
Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME.

(2)准备flink环境

  • Flink下载地址:https://flink.apache.org/zh/downloads.html
tar -xvf flink-1.13.1-bin-scala_2.12.tgz -C /usr/local/
调整配置文件:conf/flink-conf.yaml

#web ui 端口
rest.port=8081

#调整jobmanager和taskmanager的大小,根据自己的机器进行调整
jobmanager.memory.process.size: 256m
taskmanager.memory.process.size: 256m

大数据流试计算引擎Flink篇_数据_66

本地模式用到这两个脚本
start-cluster.sh
stop-cluster.sh

启动本地模式:./start-cluster.sh
注意这会可能会报错:
The derived from fraction jvm overhead memory (19.200mb (20132659 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead

原因分析:
flink最少需要192M的内存才能启动,jdk1.8之后堆内存采用的是元空间,元空间是根据内存大小自动分配的。如果元空间给的太小,flink将会启动不起来。

调整如下配置:conf/flink-conf.yaml
taskmanager.memory.process.size: 512m
 
taskmanager.memory.framework.heap.size: 64m
taskmanager.memory.framework.off-heap.size: 64m
taskmanager.memory.jvm-metaspace.size: 64m
taskmanager.memory.jvm-overhead.fraction: 0.2
taskmanager.memory.jvm-overhead.min: 16m
taskmanager.memory.jvm-overhead.max: 64m
 
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 1mb
taskmanager.memory.network.max: 256mb
  • 重新启动,成功

大数据流试计算引擎Flink篇_java_67

  • 访问web UI ip:8081

大数据流试计算引擎Flink篇_数据_68


好博客就要一起分享哦!分享海报

此处可发布评论

评论(0展开评论

暂无评论,快来写一下吧

展开评论

您可能感兴趣的博客

客服QQ 1913284695