Hudi数据查询

适用于版本0.10.1。

1 Spark数据源

可以简单地通过spark.read.parquet方式加载hudi表。

(1) 快照查询

1
2
3
4
5
val hudiIncQueryDF = spark
.read()
.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
.load(tablePath)

(2) 增量查询

查询beginInstantTime时间后的增量数据。

1
2
3
4
5
6
7
8
9
 Dataset<Row> hudiIncQueryDF = spark.read()
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), <beginInstantTime>)
.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY(), "/year=2020/month=*/day=*") // Optional, use glob pattern if querying certain partitions
.load(tablePath); // For incremental query, pass in the root/base path of table

hudiIncQueryDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

HoodieReadClient提供了以下隐式索引。

API Description
read(keys) Read out the data corresponding to the keys as a DataFrame, using Hudi’s own index for faster lookup
filterExists() Filter out already existing records from the provided RDD[HoodieRecord]. Useful for de-duplication
checkExists(keys) Check if the provided keys exist in a Hudi table

(3) Spark SQL

Spark SQL默认使用自身的Parquet实现,而不是使用Hive SerDe。

不同的表类型需要注意以下设置:

1) COW

Spark默认Parquet读取器可用于保留内建的优化,如向量化读取等。需要如下设置路径过滤器:

1
spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]);

关闭默认读取器后依旧可以查询。

2) MOR

版本>0.9.0无需额外配置。

3 Hive

(1) 增量查询

HiveIncrementalPuller允许使用HiveQL增量查询,使用Hive JDBC运行Hive查询,并将结果保存在临时表中。

Upsert工具HoodieDeltaStreamer能够从文件架构获取所需的状态信息,如app/incremental-hql/intermediate/{source_table_name}_temp/{last_commit_included}。变化表将注册为{tmpdb}.{sourcetable}{last_commit_included}。

HiveIncrementalPuller配置如下:

Config Description Default
hiveUrl Hive Server 2 URL to connect to
hiveUser Hive Server 2 Username
hivePass Hive Server 2 Password
queue YARN Queue name
tmp Directory where the temporary delta data is stored in DFS. The directory structure will follow conventions. Please see the below section.
extractSQLFile The SQL to execute on the source table to extract the data. The data extracted will be all the rows that changed since a particular point in time.
sourceTable Source Table Name. Needed to set hive environment properties.
sourceDb Source DB name. Needed to set hive environment properties.
targetTable Target Table Name. Needed for the intermediate storage directory structure.
targetDb Target table’s DB name.
tmpdb The database to which the intermediate temp delta table will be created hoodie_temp
fromCommitTime This is the most important parameter. This is the point in time from which the changed records are queried from.
maxCommits Number of commits to include in the query. Setting this to -1 will include all the commits from fromCommitTime. Setting this to a value > 0, will include records that ONLY changed in the specified number of commits after fromCommitTime. This may be needed if you need to catch up say 2 commits at a time. 3
help Utility Help

设置fromCommitTime=0和maxCommits=-1可用于初始化backfill。当前存在的局限是不能在不同查询模式间自关联。

注意Hive增量查询使用Fetch Task执行。因为Fetch Task对每个分区调用一次InputFormat.listStatus(),Hoodie元数据将在每次调用时列出。可以设置set hive.fetch.task.conversion=none保证合并分区并只调用一次。

4 PrestoDB

5 Trino

6 Impala(>=3.4)

7 支持矩阵

(1) COW

Query Engine Snapshot Queries Incremental Queries
Hive Y Y
Spark SQL Y Y
Spark Datasource Y Y
Flink SQL Y N
PrestoDB Y N
Trino Y N
Impala Y N

(2) MOR

Query Engine Snapshot Queries Incremental Queries Read Optimized Queries
Hive Y Y Y
Spark SQL Y Y Y
Spark Datasource Y Y Y
Flink SQL Y Y Y
PrestoDB Y N Y
Trino N N Y
Impala N N Y

参考资料