适用于版本0.10.1。
1 Spark数据源
可以简单地通过spark.read.parquet方式加载hudi表。
(1) 快照查询
1 | val hudiIncQueryDF = spark |
(2) 增量查询
查询beginInstantTime时间后的增量数据。
1 | Dataset<Row> hudiIncQueryDF = spark.read() |
HoodieReadClient提供了以下隐式索引。
| API | Description |
|---|---|
| read(keys) | Read out the data corresponding to the keys as a DataFrame, using Hudi’s own index for faster lookup |
| filterExists() | Filter out already existing records from the provided RDD[HoodieRecord]. Useful for de-duplication |
| checkExists(keys) | Check if the provided keys exist in a Hudi table |
(3) Spark SQL
Spark SQL默认使用自身的Parquet实现,而不是使用Hive SerDe。
不同的表类型需要注意以下设置:
1) COW
Spark默认Parquet读取器可用于保留内建的优化,如向量化读取等。需要如下设置路径过滤器:
1 | spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]); |
关闭默认读取器后依旧可以查询。
2) MOR
版本>0.9.0无需额外配置。
2 Flink SQL
3 Hive
(1) 增量查询
HiveIncrementalPuller允许使用HiveQL增量查询,使用Hive JDBC运行Hive查询,并将结果保存在临时表中。
Upsert工具HoodieDeltaStreamer能够从文件架构获取所需的状态信息,如app/incremental-hql/intermediate/{source_table_name}_temp/{last_commit_included}。变化表将注册为{tmpdb}.{sourcetable}{last_commit_included}。
HiveIncrementalPuller配置如下:
| Config | Description | Default |
|---|---|---|
| hiveUrl | Hive Server 2 URL to connect to | |
| hiveUser | Hive Server 2 Username | |
| hivePass | Hive Server 2 Password | |
| queue | YARN Queue name | |
| tmp | Directory where the temporary delta data is stored in DFS. The directory structure will follow conventions. Please see the below section. | |
| extractSQLFile | The SQL to execute on the source table to extract the data. The data extracted will be all the rows that changed since a particular point in time. | |
| sourceTable | Source Table Name. Needed to set hive environment properties. | |
| sourceDb | Source DB name. Needed to set hive environment properties. | |
| targetTable | Target Table Name. Needed for the intermediate storage directory structure. | |
| targetDb | Target table’s DB name. | |
| tmpdb | The database to which the intermediate temp delta table will be created | hoodie_temp |
| fromCommitTime | This is the most important parameter. This is the point in time from which the changed records are queried from. | |
| maxCommits | Number of commits to include in the query. Setting this to -1 will include all the commits from fromCommitTime. Setting this to a value > 0, will include records that ONLY changed in the specified number of commits after fromCommitTime. This may be needed if you need to catch up say 2 commits at a time. | 3 |
| help | Utility Help |
设置fromCommitTime=0和maxCommits=-1可用于初始化backfill。当前存在的局限是不能在不同查询模式间自关联。
注意Hive增量查询使用Fetch Task执行。因为Fetch Task对每个分区调用一次InputFormat.listStatus(),Hoodie元数据将在每次调用时列出。可以设置set hive.fetch.task.conversion=none保证合并分区并只调用一次。
4 PrestoDB
5 Trino
6 Impala(>=3.4)
7 支持矩阵
(1) COW
| Query Engine | Snapshot Queries | Incremental Queries |
|---|---|---|
| Hive | Y | Y |
| Spark SQL | Y | Y |
| Spark Datasource | Y | Y |
| Flink SQL | Y | N |
| PrestoDB | Y | N |
| Trino | Y | N |
| Impala | Y | N |
(2) MOR
| Query Engine | Snapshot Queries | Incremental Queries | Read Optimized Queries |
|---|---|---|---|
| Hive | Y | Y | Y |
| Spark SQL | Y | Y | Y |
| Spark Datasource | Y | Y | Y |
| Flink SQL | Y | Y | Y |
| PrestoDB | Y | N | Y |
| Trino | N | N | Y |
| Impala | N | N | Y |