Hudi数据写入

适用于版本0.10.1。

1 Spark Datasource Writer

  • HoodieWriteConfig

  • TABLE_NAME:必须

  • DataSourceWriteOptions

    • RECORDKEY_FIELD_OPT_KEY

      必须,在分区内唯一指定记录。可通过全局索引设置为全局唯一。默认值:uuid

    • PARTITIONPATH_FIELD_OPT_KEY

      必须,指定分区列。空字符串取消分区。URL_ENCODE_PARTITIONING_OPT_KEY指定URL编码,HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY同步Hive。默认:partitionpath

    • PRECOMBINE_FIELD_OPT_KEY

      必须,当同一批数据中出现相同键值,选取指定列中最大的数据插入。设置OverwriteWithLatestAvroPayload后将不采取该策略。默认值:ts

    • OPERATION_OPT_KEY

      写操作使用,默认UPSERT_OPERATION_OPT_VAL , 可选BULK_INSERT_OPERATION_OPT_VAL, INSERT_OPERATION_OPT_VAL, DELETE_OPERATION_OPT_VAL

    • TABLE_TYPE_OPT_KEY

      写入的表类型,需要与建表语句保持一致,并且在Spark中使用SaveMode.Append()模式。默认COW_TABLE_TYPE_OPT_VAL, 可选MOR_TABLE_TYPE_OPT_VAL

    • KEYGENERATOR_CLASS_OPT_KEY

      详见Key Generation

    • HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

      使用hive时,指定表是否分区。

      默认classOf[SlashEncodedDayPartitionValueExtractor].getCanonicalName,

      可选classOf[MultiPartKeysValueExtractor].getCanonicalName, classOf[TimestampBasedKeyGenerator].getCanonicalName,

      classOf[NonPartitionedExtractor].getCanonicalName,

      classOf[GlobalDeleteKeyGenerator].getCanonicalName (OPERATION_OPT_KEY为DELETE_OPERATION_OPT_VAL时)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    inputDF.write()
    .format("org.apache.hudi")
    .options(clientOpts) //Where clientOpts is of type Map[String, String]. clientOpts can include any other options necessary.
    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
    .option(HoodieWriteConfig.TABLE_NAME, tableName)
    .mode(SaveMode.Append)
    .save(basePath);

    Spark SQL示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    insert into h0 select 1, 'a1', 20;

    -- insert static partition
    insert into h_p0 partition(dt = '2021-01-02') select 1, 'a1';

    -- insert dynamic partition
    insert into h_p0 select 1, 'a1', dt;

    -- insert dynamic partition
    insert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh;

    -- insert overwrite table
    insert overwrite table h0 select 1, 'a1', 20;

    -- insert overwrite table with static partition
    insert overwrite h_p0 partition(dt = '2021-01-02') select 1, 'a1';

    -- insert overwrite table with dynamic partition
    insert overwrite table h_p1 select 2 as id, 'a2', '2021-01-03' as dt, '19' as hh;

    注意:

    • Insert

      • strict模式:插入重复键名时,cow表抛异常,mor表更新数据。
      • 非strict模式:同Spark DataSourceah中insert操作,可以设置hoodie.sql.insert.mode变更。
    • Bulk insert

      默认使用insert,可以设置hoodie.sql.bulk.insert.enable开启。

(1) 插入覆盖非分区表

相比先删后插入,效率更高。Hudi Cleaner最终会清理之前的快照文件组。

1
insert overwrite table h0 select 1, 'a1', 20;

(2) 插入覆盖分区表

因充分利用了索引、预聚合和再分区操作,相比upsert更加高效。

1
insert overwrite table h_p1 select 2 as id, 'a2', '2021-01-03' as dt, '19' as hh;

(3) 删除

Hudi支持两种删除模式,详见Delete support in Hudi

  • 软删除

    保留记录键,将其他字段置为null

  • 硬删除

    物理删除。

    通过以下三种方式设置:

    • 使用Datasource时,设置OPERATION_OPT_KEY为DELETE_OPERATION_OPT_VAL。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      val deletes = dataGen.generateDeletes(df.collectAsList())
      val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
      df.write.format("org.apache.hudi").
      options(getQuickstartWriteConfigs).
      // DELETE_OPERATION_OPT_VAL?
      option(OPERATION_OPT_KEY,"delete").
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Append).
      save(basePath);
    • 使用DataSource时,设置PAYLOAD_CLASS_OPT_KEY为”org.apache.hudi.EmptyHoodieRecordPayload”。将删除提交的所有记录。

      1
      2
      3
      4
      5
      deleteDF // dataframe containing just records to be deleted
      .write().format("org.apache.hudi")
      .option(...) // Add HUDI options like record-key, partition-path and others as needed for your setup
      // specify record_key, partition_key, precombine_fieldkey & usual params
      .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
    • 使用DataSource或DeltaStreamer,为需要删除的记录添加列_hoodie_is_deleted,并设置为true。

(4) 并发控制

hudi-spark模块提供了DataSource API来读写Spark DataFrame到hudi表中。详见concurrency control concepts

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
inputDF.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option("hoodie.write.lock.zookeeper.url", "zookeeper")
.option("hoodie.write.lock.zookeeper.port", "2181")
.option("hoodie.write.lock.zookeeper.lock_key", "test_table")
.option("hoodie.write.lock.zookeeper.base_path", "/test")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
.option(TABLE_NAME, tableName)
.mode(Overwrite)
.save(basePath)

(5) 提交通知

提供关于写提交的回调通知。

  • HTTP

    | Config | Description | Required | Default |
    | ——————————– | ———————————————————— | ——– | ———————————————————– |
    | TURN_CALLBACK_ON | Turn commit callback on/off | optional | false (callbacks off) |
    | CALLBACK_HTTP_URL | Callback host to be sent along with callback messages | required | N/A |
    | CALLBACK_HTTP_TIMEOUT_IN_SECONDS | Callback timeout in seconds | optional | 3 |
    | CALLBACK_CLASS_NAME | Full path of callback class and must be a subclass of HoodieWriteCommitCallback class, org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback by default | optional | org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback |
    | CALLBACK_HTTP_API_KEY_VALUE | Http callback API key | optional | hudi_write_commit_http_callback |
    | | | | |

  • Kafka

    | Config | Description | Required | Default |
    | —————– | ———————————————————— | ——– | ——- |
    | TOPIC | Kafka topic name to publish timeline activity into. | required | N/A |
    | PARTITION | It may be desirable to serialize all changes into a single Kafka partition for providing strict ordering. By default, Kafka messages are keyed by table name, which guarantees ordering at the table level, but not globally (or when new partitions are added) | required | N/A |
    | RETRIES | Times to retry the produce | optional | 3 |
    | ACKS | kafka acks level, all by default to ensure strong durability | optional | all |
    | BOOTSTRAP_SERVERS | Bootstrap servers of kafka cluster, to be used for publishing commit metadata | required | N/A |

  • 自定义实现

    自定义扩展HoodieWriteCommitCallback,实现消息回调。详见API

宿表配置

Option Name Required Default Remarks
path Y N/A Base path for the target hoodie table. The path would be created if it does not exist, otherwise a hudi table expects to be initialized successfully
table.type N COPY_ON_WRITE Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ
write.operation N upsert The write operation, that this write should do (insert or upsert is supported)
write.precombine.field N ts Field used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)
write.payload.class N OverwriteWithLatestAvroPayload.class Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. This will render any value set for the option in-effective
write.insert.drop.duplicates N false Flag to indicate whether to drop duplicates upon insert. By default insert will accept duplicates, to gain extra performance
write.ignore.failed N true Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. By default true (in favor of streaming progressing over data integrity)
hoodie.datasource.write.recordkey.field N uuid Record key field. Value to be used as the recordKey component of HoodieKey. Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using the dot notation eg: a.b.c
hoodie.datasource.write.keygenerator.class N SimpleAvroKeyGenerator.class Key generator class, that implements will extract the key out of incoming record
write.tasks N 4 Parallelism of tasks that do actual write, default is 4
write.batch.size.MB N 128 Batch buffer size in MB to flush data into the underneath filesystem

MOR表异步压缩策略

Option Name Required Default Remarks
compaction.async.enabled N true Async Compaction, enabled by default for MOR
compaction.trigger.strategy N num_commits Strategy to trigger compaction, options are ‘num_commits’: trigger compaction when reach N delta commits; ‘time_elapsed’: trigger compaction when time elapsed > N seconds since last compaction; ‘num_and_time’: trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; ‘num_or_time’: trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is ‘num_commits’
compaction.delta_commits N 5 Max delta commits needed to trigger compaction, default 5 commits
compaction.delta_seconds N 3600 Max delta seconds time needed to trigger compaction, default 1 hour

当前仅支持Insert into。

参考资料