Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Flink Standalone集群安装
  • Flink启动脚本
  • keyBy数据分配计算方法
    • 场景
    • 问题
    • 分析
      • 编写代码模拟分配
      • 验证
    • 解决
  • Flink流处理高级操作之时间语义
  • Flink流处理之窗口Window
  • Flink流处理之ProcessFunction
  • Chain分隔
  • Backpressured详细介绍
  • Flink消费Kafka
  • Flink操作MySQL
  • Flink自定义Connector-TableApi SQL
  • Flink使用异常处理
  • FlinkCDC
  • LinkageError异常处理
  • Flink日志Log4j发送到Kafka
  • 《Flink教程》笔记
Jast-zsh
2023-03-10
目录

keyBy数据分配计算方法

[toc]

# 场景

Flink keyBy是根据某一个字段Hash后除以6取余数,原以为每个subtask会平均处理数据量,结果发现没有相对平均处理。有三个subtask没有数据

# 问题

Flink代码keyBy后发现数据没有平均分配到每一个subtask中,数据分配比例大概为1,0,0,0,2,3 (六份数据),具体数据处理比例见下图

Records 为11,这11条数据是广播的数据,不是keyBy分配的数据

image-20220811133414034

# 分析

查询资料发现keyBy分组的算法在 KeyGroupRangeAssignment类中assignKeyToParallelOperator方法

源码如下(仅列出部分需要的):



    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        return computeOperatorIndexForKeyGroup(
                maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }

public static int assignToKeyGroup(Object key, int maxParallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }

   public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

 public static int computeOperatorIndexForKeyGroup(
            int maxParallelism, int parallelism, int keyGroupId) {
        return keyGroupId * parallelism / maxParallelism;
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

通过源码分析,计算方法为:(MathUtils.murmurHash(key.hashCode()) % maxParallelism) * parallelism / maxParallelism

接下来继续分析每一个值具体是多少。

  • key

key不用说了就是我们keyBy的值

  • maxParallelism

    计算默认最大并行度的方法

    public static int computeDefaultMaxParallelism(int operatorParallelism) {
    
            checkParallelismPreconditions(operatorParallelism);
    
            return Math.min(
                    Math.max(
                            MathUtils.roundUpToPowerOfTwo(
                                    operatorParallelism + (operatorParallelism / 2)),
                            DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
                    UPPER_BOUND_MAX_PARALLELISM);
        }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    我们实际开的并行度只有6,最终计算出的maxParallelism取值为 DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1<<7 即 128

  • parallelism

我们自己设置的6

# 编写代码模拟分配

				
				int maxParallelism = 1<<7;

        // i keyBy字段,代码中keyBy也使用的int类型
        for (int i = 0; i < 6; i++) {

            //注意这里要转为Object,与源码中方法相同
            Object key = i;

            // 代码中并行度
            int parallelism = 6;

            int keyGroupId = MathUtils.murmurHash(key.hashCode()) % maxParallelism;

            int index = keyGroupId * parallelism / maxParallelism;

            System.out.println("数据分配:"+index);

        }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

分配结果

数据分配:4
数据分配:4
数据分配:5
数据分配:5
数据分配:0
数据分配:5
1
2
3
4
5
6

# 验证

通过上面我们模拟计算的结果,发现数据分配至为0->1/5数据,4->2/5数据,5->3/5数据,与Flink中查看处理数据量比例相同。

image-20220811133414034

# 解决

既然知道计算方法,我们就按照算法进行数据平均分配,将keyBy的值进行相对应修改即可。

上次更新: 2023/03/10, 16:57:29
Flink启动脚本
Flink流处理高级操作之时间语义

← Flink启动脚本 Flink流处理高级操作之时间语义→

最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式