云计算百科
云计算领域专业知识百科平台

大数据技术之 Flink 优化(资源配置调优)

本博客总结为B站尚硅谷大数据Flink2.0调优,Flink性能优化视频的笔记总结。尚硅谷https://so.csdn.net/so/search?q=%E5%B0%9A%E7%A1%85%E8%B0%B7&spm=1001.2101.3001.7020大数据Flink2.0调优,Flink性能优化https://www.bilibili.com/video/BV1Q5411f76P

资源配置调优

        Flink 性能调优的第一步,就是为任务分配合适的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。

提交方式主要是 yarn-per-job,资源的分配在使用脚本提交 Flink 任务时进行指定。


标准的 Flink 任务提交脚本(Generic CLI 模式)

从 1.11 开始,增加了通用客户端模式,参数使用-D <property=value>指定

bin/flink run \\
-t yarn-per-job \\
-d \\
-p 5 \\ 指定并行度
-Dyarn.application.queue=test \\ 指定 yarn 队列
-Djobmanager.memory.process.size=1024mb \\ 指定 JM 的总进程大小
-Dtaskmanager.memory.process.size=1024mb \\ 指定每个 TM 的总进程大小
-Dtaskmanager.numberOfTaskSlots=2 \\ 指定每个 TM 的 slot 数
-c com.atguigu.flink.tuning.UvDemo \\
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

参数列表:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/deployment/config.html 

内存设置

TaskManager 内存模型

1、内存模型详解

 
JVM 特定内存:
JVM 本身使用的内存,包含 JVM 的 metaspace 和 over-head

1)JVM metaspace:JVM 元空间

taskmanager.memory.jvm-metaspace.size,默认 256mb

2)JVM over-head 执行开销:JVM 执行时自身所需要的内容,包括线程堆栈、IO、编译

缓存等所使用的内存。

taskmanager.memory.jvm-overhead.fraction,默认 0.1

taskmanager.memory.jvm-overhead.min,默认 192mb

taskmanager.memory.jvm-overhead.max,默认 1gb

总进程内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max
大小

框架内存:
Flink 框架,即 TaskManager 本身所占用的内存,
不计入 Slot 的资源中。

堆内:taskmanager.memory.framework.heap.size,默认 128MB

堆外:taskmanager.memory.framework.off-heap.size,默认 128MB

Task 内存:
Task 执行用户代码时所使用的内存

堆内:taskmanager.memory.task.heap.size,默认 none,由 Flink 内存扣除掉其他部分

的内存得到。

堆外:taskmanager.memory.task.off-heap.size,默认 0,表示不使用堆外内存

网络内存:
网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区

堆外:taskmanager.memory.network.fraction,默认 0.1

taskmanager.memory.network.min,默认 64mb

taskmanager.memory.network.max,默认 1gb

Flink 内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max

大小

托管内存:
用于 RocksDB State Backend 的本地内存和批的排序、哈希表、缓存中间结果。

堆外:taskmanager.memory.managed.fraction,默认 0.4

taskmanager.memory.managed.size,默认 none

如果 size 没指定,则等于 Flink 内存*fraction


案例分析

基于Yarn模式,一般参数指定的是总进程内存,taskmanager.memory.process.size,

比如指定为 4G,每一块内存得到大小如下:

(1)计算 Flink 内存

        JVM 元空间 256m

        JVM 执行开销: 4g*0.1=409.6m,在[192m,1g]之间,最终结果 409.6m

        Flink 内存=4g-256m-409.6m=3430.4m

(2)网络内存=3430.4m*0.1=343.04m,在[64m,1g]之间,最终结果 343.04m

(3)托管内存=3430.4m*0.4=1372.16m

(4)框架内存,堆内和堆外都是 128m

(5)Task 堆内内存=3430.4m-128m-128m-343.04m-1372.16m=1459.2m

所以进程内存给多大,每一部分内存需不需要调整,可以看内存的使用率来调整。


2 生产资源配置示例

bin/flink run \\
-t yarn-per-job \\
-d \\
-p 5 \\ 指定并行度
-Dyarn.application.queue=test \\ 指定 yarn 队列
-Djobmanager.memory.process.size=2048mb \\ JM2~4G 足够
-Dtaskmanager.memory.process.size=4096mb \\ 单个 TM2~8G 足够
-Dtaskmanager.numberOfTaskSlots=2 \\ 与容器核数 1core:1slot 或 2core:1slot
-c com.atguigu.flink.tuning.UvDemo \\
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

Flink 是实时流处理,关键在于资源情况能不能抗住高峰时期每秒的数据量,通常用 QPS/TPS 来描述数据情况。


合理利用 cpu 资源

        Yarn 的容量调度器默认情况下是使用“DefaultResourceCalculator”分配策略,只根据内存调度资源,所以在 Yarn 的资源管理页面上看到每个容器的 vcore 个数还是 1。

        可以修改策略为 DominantResourceCalculator,该资源计算器在计算资源的时候会综合考虑 cpu 和内存的情况。在 capacity-scheduler.xml 中修改属性:
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<!– <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value> –>
<value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>

使用 DefaultResourceCalculator 策略

bin/flink run \\
-t yarn-per-job \\
-d \\
-p 5 \\
-Drest.flamegraph.enabled=true \\
-Dyarn.application.queue=test \\
-Djobmanager.memory.process.size=1024mb \\
-Dtaskmanager.memory.process.size=4096mb \\
-Dtaskmanager.numberOfTaskSlots=2 \\
-c com.atguigu.flink.tuning.UvDemo \\
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

可以看到一个容器只有一个 vcore:

使用 DominantResourceCalculator 策略

修改后 yarn 配置后,分发配置并重启 yarn,再次提交 flink 作业:
bin/flink run \\
-t yarn-per-job \\
-d \\
-p 5 \\
-Drest.flamegraph.enabled=true \\
-Dyarn.application.queue=test \\
-Djobmanager.memory.process.size=1024mb \\
-Dtaskmanager.memory.process.size=4096mb \\
-Dtaskmanager.numberOfTaskSlots=2 \\
-c com.atguigu.flink.tuning.UvDemo \\
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

看到容器的 vcore 数变了:

JobManager1 个,占用 1 个容器,vcore=1

TaskManager3 个,占用 3 个容器,每个容器 vcore=2,总 vcore=2*3=6,因为
默认 单个容器的 vcore 数=单 TM 的 slot 数

使 用 DominantResourceCalculator 策 略 并 指 定 容 器 vcore 数

指定 yarn 容器的 vcore 数,提交:
bin/flink run \\
-t yarn-per-job \\
-d \\
-p 5 \\
-Drest.flamegraph.enabled=true \\
-Dyarn.application.queue=test \\
-Dyarn.containers.vcores=3 \\
-Djobmanager.memory.process.size=1024mb \\
-Dtaskmanager.memory.process.size=4096mb \\
-Dtaskmanager.numberOfTaskSlots=2 \\
-c com.atguigu.flink.tuning.UvDemo \\
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

JobManager1 个,占用 1 个容器,vcore=1

TaskManager3 个,占用 3 个容器,每个容器
vcore =3
,总 vcore=3*3=9


并行度设置

全局并行度计算

        开发完成后,先进行
压测
。任务并行度给 10 以下,测试单个并行度的处理上限。然后
总 QPS/单并行度的处理能力 = 并行度

        开发完 Flink 作业,压测的方式很简单,先在 kafka 中积压数据,之后开启 Flink 任务, 出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。

        不能只从 QPS 去得出并行度,因为有些字段少、逻辑简单的任务,单并行度一秒处理几万条数据。而有些数据字段多,处理逻辑复杂,单并行度一秒只能处理 1000 条数据。

        最好根据高峰期的 QPS 压测,
并行度*1.2 倍
,富余一些资源。

Source 端并行度的配置

        数据源端是 Kafka,Source 的并行度设置为 Kafka 对应 Topic 的分区数。

        如果已经等于 Kafka 的分区数,消费速度仍跟不上数据生产速度,考虑下 Kafka 要扩大分区,同时调大并行度等于分区数。

        Flink 的一个并行度可以处理一至多个分区的数据,如果并行度多于 Kafka 的分区数,那么就会造成有的并行度空闲,浪费资源。

Transform 端并行度的配置


Keyby 之前的算子

        一般不会做太重的操作,都是比如 map、filter、flatmap 等处理较快的算子,并行度 可以和 source 保持一致。


Keyby 之后的算子

        如果并发较大,建议设置并行度为 2 的整数次幂,例如:128、256、512; 小并发任务的并行度不一定需要设置成 2 的整数次幂;大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂;

Sink 端并行度的配置

        Sink 端是数据流向下游的地方,可以根据
Sink 端的数据量

下游的服务抗压能力
进行评估。如果 Sink 端是 Kafka,可以设为 Kafka 对应 Topic 的分区数。

        Sink 端的数据量小,比较常见的就是监控告警的场景,并行度可以设置的小一些。

        Source 端的数据量是最小的,拿到 Source 端流过来的数据后做了细粒度的拆分,数据量不断的增加,到 Sink 端的数据量就非常大。那么在 Sink 到下游的存储中间件的时候就需要提高并行度。

        另外 Sink 端要与下游的服务进行交互,并行度还得根据下游的服务抗压能力来设置,如果在 Flink Sink 这端的数据量过大的话,且 Sink 处并行度也设置的很大,但下游的服务完全撑不住这么大的并发写入,可能会造成下游服务直接被写挂,所以最终还是要在 Sink
处的并行度做一定的权衡。

赞(0)
未经允许不得转载:网硕互联帮助中心 » 大数据技术之 Flink 优化(资源配置调优)
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!