Hudi压缩

适用于版本0.10.1。

1 异步压缩

Hudi默认采用异步压缩策略。主要分为以下两个阶段:

  • 调度

    由消费作业完成。Hudi扫描分区,选择压缩的文件分片并最终写入时间线。

  • 执行

    独立进程读取压缩计划并执行文件分片压缩。

(1) Spark结构化流

默认开启。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;


DataStreamWriter<Row> writer = streamingInput.writeStream().format("org.apache.hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY(), operationType)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "10")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY(), "true")
.option(HoodieWriteConfig.TABLE_NAME, tableName).option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Append());
writer.trigger(new ProcessingTime(30000)).start(tablePath);

(2) DeltaStreamer持续模式

常驻Spark应用程序持续压缩数据。

1
2
3
4
5
6
7
8
9
10
spark-submit --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--table-type MERGE_ON_READ \
--target-base-path <hudi_base_path> \
--target-table <hudi_table> \
--source-class org.apache.hudi.utilities.sources.JsonDFSSource \
--source-ordering-field ts \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--props /path/to/source.properties \
--continous

2 同步压缩

如果关注读取提交结果的实时性,可以在同一作业中同步压缩。

配置–disable-compaction关闭异步压缩,实现同步压缩。同时可以在DeltaStreamer CLI中配置资源分配策略,如–delta-sync-scheduling-weight, –compact-scheduling-weight, –delta-sync-scheduling-minshare和–compact-scheduling-minshare

3 离线压缩

MOR默认每5次提交执行一次压缩。

注意:压缩任务执行主要分为两步:调度压缩计划和执行压缩计划。推荐调度压缩计划由写任务周期触发。默认开启ompaction.schedule.enable。

(1) 压缩工具

Hudi提供独立工具用于异步压缩。其中–instant-time可选,否则选择最早的已调度压缩。

1
2
3
4
5
6
spark-submit --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \
--class org.apache.hudi.utilities.HoodieCompactor \
--base-path <base_path> \
--table-name <table_name> \
--schema-file <schema_file> \
--instant-time <compaction_instant>

(2) CLI

1
2
hudi:trips->compaction run --tableName <table_name> --parallelism <parallelism> --compactionInstant <InstantTime>
...

(3) Flink离线压缩

1
2
# Command line
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink-bundle_2.11-0.9.0.jar --path hdfs://xxx:9000/table
Option Name Required Default Remarks
--path frue -- The path where the target table is stored on Hudi
--compaction-max-memory false 100 The index map size of log data during compaction, 100 MB by default. If you have enough memory, you can turn up this parameter
--schedule false false whether to execute the operation of scheduling compaction plan. When the write process is still writing, turning on this parameter have a risk of losing data. Therefore, it must be ensured that there are no write tasks currently writing data to this table when this parameter is turned on
--seq false LIFO The order in which compaction tasks are executed. Executing from the latest compaction plan by default. LIFO: executing from the latest plan. FIFO: executing from the oldest plan.

参考资料