数据平台小文件问题分析及防治总结

2021/10/18 Big Data 共 1564 字,约 5 分钟
欢迎

结合工作经验,梳理数据平台小文件过多的危害、预防以及处理相关知识。

危害

小文件会在存储和计算方面影响平台性能:

  • 存储方面,单个文件在 HDFS 的 NameNode 上占用内存是相同的,存储总量相同条件下,小文件势必会导致文件数量的增加,降低内存利用效率。
  • 计算方面,一个小文件的数据量不大,但会对应起一个 MapTask,造成计算资源的低效使用。(Spark 的切片规则与 MR 相同,小文件对应首个 Stage 里的 Task。)

预防

在产出结果数据文件时,可以通过相关设置,减少写出的 reducer/并行度数目,降低文件数量和小文件生成概率。

  • MR 中可以根据业务适度减少 reducer 个数,从而落盘文件的数目可能会减少。
  • Hive 使用 MapReduce 作为计算引擎时,可以调整以下参数:
    • set hive.merge.mapfiles=true; 默认即为 true,在 map-only 任务结束时合并小文件;
    • set hive.merge.mapredfiles=true; 默认 false,在 map-reduce 任务结束时合并小文件。
  • Spark 的 coalesce 算子可以减少分区数,实现降低并行度。(repartition 算子可以增加或减少分区数,可能会有 shuffle 过程。)
  • 在 SparkSQL(含 StructuredStreaming)中,可以使用 hint 语法 ` /*+ coalesce(n) */ ` 来减少数据分区。非动态分区写出时效果较为明显。(repartition 同理。)
  • HiveSQL 和 SparkSQL 动态分区写出时,建议灵活应用 distribute by。
    • distribute by dt 可以先将相同分区的数据发送至相同 reduce 任务中,从而降低小文件概率。可能会发生数据倾斜。
    • set hive.exec.reducers.bytes.per.reducer = 1024000000; 可以设定单个 reducer 的大小,配合 distribute by rand() 可以负载均衡。
    • insert into test select * from table distribute by cast(rand()*n as int); 能够一定程度上控制单个分区的文件个数。相同分区的数据会进入 n 个 reducer,单个分区产生至少 n 个文件。与上条 hint 用法效果类似。
  • 实时处理中建议将数据存入 kafka 、redis 等非 HDFS 组件中,或者直接选用 Hudi(具备小文件自动合并特性)。
  • 实时数据如果确实要落盘 Hive 表,建议尽量缩短表的生命周期。Flink 任务写入 Hive 时可以做以下调整:
    • 设置 sink.shuffle-by-partition.enable=ture,程序会启用单个并行度写出(多个并行度会产生多个写出文件)。
    • 若程序有跨分区写出情况,即使设置了上述参数,不同分区仍可能会使用不同并行度写出,因此需要合理设置分区字段。
    • 设置 auto-compaction=ture,程序会在单个 checkpoint 周期内自动合并小文件,为提高效果可以考虑适当延长 checkpoint 间隔,详细见官方文档
    • 启用定时任务覆盖重写原数据,也是一种简易方法。

处理

如果已经产生小文件,可以针对影响方面进行针对性处理。

  • set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 控制 Hive 输入方式,可以将小文件合并。
  • set mapreduce.job.jvm.numtasks=10 开启 JVM 重用(仅针对 MR 有效)。
  • Hadoop Archive 是一个高效将小文件放入 HDFS 块中的文件存档工具,能够将多个小文件打包成一个 HAR 文件,从而实现减少 NameNode 的内存使用,而不影响文件本身。使用案例可参考 Hadoop Archive 存档的使用

文档信息

Search

    Table of Contents