适用于版本0.10.1。
1 Spark Datasource Writer
HoodieWriteConfig
TABLE_NAME:必须
DataSourceWriteOptions
RECORDKEY_FIELD_OPT_KEY
必须,在分区内唯一指定记录。可通过全局索引设置为全局唯一。默认值:uuid
PARTITIONPATH_FIELD_OPT_KEY
必须,指定分区列。空字符串取消分区。URL_ENCODE_PARTITIONING_OPT_KEY指定URL编码,HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY同步Hive。默认:partitionpath
PRECOMBINE_FIELD_OPT_KEY
必须,当同一批数据中出现相同键值,选取指定列中最大的数据插入。设置OverwriteWithLatestAvroPayload后将不采取该策略。默认值:ts
OPERATION_OPT_KEY
写操作使用,默认UPSERT_OPERATION_OPT_VAL , 可选BULK_INSERT_OPERATION_OPT_VAL, INSERT_OPERATION_OPT_VAL, DELETE_OPERATION_OPT_VAL
TABLE_TYPE_OPT_KEY
写入的表类型,需要与建表语句保持一致,并且在Spark中使用SaveMode.Append()模式。默认
COW_TABLE_TYPE_OPT_VAL
, 可选MOR_TABLE_TYPE_OPT_VAL
KEYGENERATOR_CLASS_OPT_KEY
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY
使用hive时,指定表是否分区。
默认classOf[SlashEncodedDayPartitionValueExtractor].getCanonicalName,
可选classOf[MultiPartKeysValueExtractor].getCanonicalName, classOf[TimestampBasedKeyGenerator].getCanonicalName,
classOf[NonPartitionedExtractor].getCanonicalName,
classOf[GlobalDeleteKeyGenerator].getCanonicalName (OPERATION_OPT_KEY为DELETE_OPERATION_OPT_VAL时)
1
2
3
4
5
6
7
8
9inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) //Where clientOpts is of type Map[String, String]. clientOpts can include any other options necessary.
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);Spark SQL示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19insert into h0 select 1, 'a1', 20;
-- insert static partition
insert into h_p0 partition(dt = '2021-01-02') select 1, 'a1';
-- insert dynamic partition
insert into h_p0 select 1, 'a1', dt;
-- insert dynamic partition
insert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh;
-- insert overwrite table
insert overwrite table h0 select 1, 'a1', 20;
-- insert overwrite table with static partition
insert overwrite h_p0 partition(dt = '2021-01-02') select 1, 'a1';
-- insert overwrite table with dynamic partition
insert overwrite table h_p1 select 2 as id, 'a2', '2021-01-03' as dt, '19' as hh;注意:
Insert
- strict模式:插入重复键名时,cow表抛异常,mor表更新数据。
- 非strict模式:同Spark DataSourceah中insert操作,可以设置hoodie.sql.insert.mode变更。
Bulk insert
默认使用insert,可以设置hoodie.sql.bulk.insert.enable开启。
(1) 插入覆盖非分区表
相比先删后插入,效率更高。Hudi Cleaner最终会清理之前的快照文件组。
1 | insert overwrite table h0 select 1, 'a1', 20; |
(2) 插入覆盖分区表
因充分利用了索引、预聚合和再分区操作,相比upsert更加高效。
1 | insert overwrite table h_p1 select 2 as id, 'a2', '2021-01-03' as dt, '19' as hh; |
(3) 删除
Hudi支持两种删除模式,详见Delete support in Hudi。
软删除
保留记录键,将其他字段置为null
硬删除
物理删除。
通过以下三种方式设置:
使用Datasource时,设置OPERATION_OPT_KEY为DELETE_OPERATION_OPT_VAL。
1
2
3
4
5
6
7
8
9
10
11
12val deletes = dataGen.generateDeletes(df.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
// DELETE_OPERATION_OPT_VAL?
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath);使用DataSource时,设置PAYLOAD_CLASS_OPT_KEY为”org.apache.hudi.EmptyHoodieRecordPayload”。将删除提交的所有记录。
1
2
3
4
5deleteDF // dataframe containing just records to be deleted
.write().format("org.apache.hudi")
.option(...) // Add HUDI options like record-key, partition-path and others as needed for your setup
// specify record_key, partition_key, precombine_fieldkey & usual params
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")使用DataSource或DeltaStreamer,为需要删除的记录添加列_hoodie_is_deleted,并设置为true。
(4) 并发控制
hudi-spark模块提供了DataSource API来读写Spark DataFrame到hudi表中。详见concurrency control concepts。
示例如下:
1 | inputDF.write.format("hudi") |
(5) 提交通知
提供关于写提交的回调通知。
HTTP
| Config | Description | Required | Default |
| ——————————– | ———————————————————— | ——– | ———————————————————– |
| TURN_CALLBACK_ON | Turn commit callback on/off | optional | false (callbacks off) |
| CALLBACK_HTTP_URL | Callback host to be sent along with callback messages | required | N/A |
| CALLBACK_HTTP_TIMEOUT_IN_SECONDS | Callback timeout in seconds | optional | 3 |
| CALLBACK_CLASS_NAME | Full path of callback class and must be a subclass of HoodieWriteCommitCallback class, org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback by default | optional | org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback |
| CALLBACK_HTTP_API_KEY_VALUE | Http callback API key | optional | hudi_write_commit_http_callback |
| | | | |Kafka
| Config | Description | Required | Default |
| —————– | ———————————————————— | ——– | ——- |
| TOPIC | Kafka topic name to publish timeline activity into. | required | N/A |
| PARTITION | It may be desirable to serialize all changes into a single Kafka partition for providing strict ordering. By default, Kafka messages are keyed by table name, which guarantees ordering at the table level, but not globally (or when new partitions are added) | required | N/A |
| RETRIES | Times to retry the produce | optional | 3 |
| ACKS | kafka acks level, all by default to ensure strong durability | optional | all |
| BOOTSTRAP_SERVERS | Bootstrap servers of kafka cluster, to be used for publishing commit metadata | required | N/A |自定义实现
自定义扩展HoodieWriteCommitCallback,实现消息回调。详见API。
2 Flink SQL Writer
宿表配置
Option Name | Required | Default | Remarks |
---|---|---|---|
path | Y | N/A | Base path for the target hoodie table. The path would be created if it does not exist, otherwise a hudi table expects to be initialized successfully |
table.type | N | COPY_ON_WRITE | Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ |
write.operation | N | upsert | The write operation, that this write should do (insert or upsert is supported) |
write.precombine.field | N | ts | Field used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..) |
write.payload.class | N | OverwriteWithLatestAvroPayload.class | Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. This will render any value set for the option in-effective |
write.insert.drop.duplicates | N | false | Flag to indicate whether to drop duplicates upon insert. By default insert will accept duplicates, to gain extra performance |
write.ignore.failed | N | true | Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. By default true (in favor of streaming progressing over data integrity) |
hoodie.datasource.write.recordkey.field | N | uuid | Record key field. Value to be used as the recordKey component of HoodieKey . Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using the dot notation eg: a.b.c |
hoodie.datasource.write.keygenerator.class | N | SimpleAvroKeyGenerator.class | Key generator class, that implements will extract the key out of incoming record |
write.tasks | N | 4 | Parallelism of tasks that do actual write, default is 4 |
write.batch.size.MB | N | 128 | Batch buffer size in MB to flush data into the underneath filesystem |
MOR表异步压缩策略
Option Name | Required | Default | Remarks |
---|---|---|---|
compaction.async.enabled | N | true | Async Compaction, enabled by default for MOR |
compaction.trigger.strategy | N | num_commits | Strategy to trigger compaction, options are ‘num_commits’: trigger compaction when reach N delta commits; ‘time_elapsed’: trigger compaction when time elapsed > N seconds since last compaction; ‘num_and_time’: trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; ‘num_or_time’: trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is ‘num_commits’ |
compaction.delta_commits | N | 5 | Max delta commits needed to trigger compaction, default 5 commits |
compaction.delta_seconds | N | 3600 | Max delta seconds time needed to trigger compaction, default 1 hour |
当前仅支持Insert into。