适用于版本0.10.1。
1 Spark数据源
可以简单地通过spark.read.parquet
方式加载hudi表。
(1) 快照查询
1 | val hudiIncQueryDF = spark |
(2) 增量查询
查询beginInstantTime时间后的增量数据。
1 | Dataset<Row> hudiIncQueryDF = spark.read() |
HoodieReadClient提供了以下隐式索引。
API | Description |
---|---|
read(keys) | Read out the data corresponding to the keys as a DataFrame, using Hudi’s own index for faster lookup |
filterExists() | Filter out already existing records from the provided RDD[HoodieRecord] . Useful for de-duplication |
checkExists(keys) | Check if the provided keys exist in a Hudi table |
(3) Spark SQL
Spark SQL默认使用自身的Parquet实现,而不是使用Hive SerDe。
不同的表类型需要注意以下设置:
1) COW
Spark默认Parquet读取器可用于保留内建的优化,如向量化读取等。需要如下设置路径过滤器:
1 | spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]); |
关闭默认读取器后依旧可以查询。
2) MOR
版本>0.9.0无需额外配置。
2 Flink SQL
3 Hive
(1) 增量查询
HiveIncrementalPuller允许使用HiveQL增量查询,使用Hive JDBC运行Hive查询,并将结果保存在临时表中。
Upsert工具HoodieDeltaStreamer能够从文件架构获取所需的状态信息,如app/incremental-hql/intermediate/{source_table_name}_temp/{last_commit_included}。变化表将注册为{tmpdb}.{sourcetable}{last_commit_included}。
HiveIncrementalPuller配置如下:
Config | Description | Default |
---|---|---|
hiveUrl | Hive Server 2 URL to connect to | |
hiveUser | Hive Server 2 Username | |
hivePass | Hive Server 2 Password | |
queue | YARN Queue name | |
tmp | Directory where the temporary delta data is stored in DFS. The directory structure will follow conventions. Please see the below section. | |
extractSQLFile | The SQL to execute on the source table to extract the data. The data extracted will be all the rows that changed since a particular point in time. | |
sourceTable | Source Table Name. Needed to set hive environment properties. | |
sourceDb | Source DB name. Needed to set hive environment properties. | |
targetTable | Target Table Name. Needed for the intermediate storage directory structure. | |
targetDb | Target table’s DB name. | |
tmpdb | The database to which the intermediate temp delta table will be created | hoodie_temp |
fromCommitTime | This is the most important parameter. This is the point in time from which the changed records are queried from. | |
maxCommits | Number of commits to include in the query. Setting this to -1 will include all the commits from fromCommitTime. Setting this to a value > 0, will include records that ONLY changed in the specified number of commits after fromCommitTime. This may be needed if you need to catch up say 2 commits at a time. | 3 |
help | Utility Help |
设置fromCommitTime=0和maxCommits=-1可用于初始化backfill。当前存在的局限是不能在不同查询模式间自关联。
注意Hive增量查询使用Fetch Task执行。因为Fetch Task对每个分区调用一次InputFormat.listStatus(),Hoodie元数据将在每次调用时列出。可以设置set hive.fetch.task.conversion=none
保证合并分区并只调用一次。
4 PrestoDB
5 Trino
6 Impala(>=3.4)
7 支持矩阵
(1) COW
Query Engine | Snapshot Queries | Incremental Queries |
---|---|---|
Hive | Y | Y |
Spark SQL | Y | Y |
Spark Datasource | Y | Y |
Flink SQL | Y | N |
PrestoDB | Y | N |
Trino | Y | N |
Impala | Y | N |
(2) MOR
Query Engine | Snapshot Queries | Incremental Queries | Read Optimized Queries |
---|---|---|---|
Hive | Y | Y | Y |
Spark SQL | Y | Y | Y |
Spark Datasource | Y | Y | Y |
Flink SQL | Y | Y | Y |
PrestoDB | Y | N | Y |
Trino | N | N | Y |
Impala | N | N | Y |