Hudi并发控制

适用于版本0.10.1。

本文讨论在多个写入时的并发模型。

使用DeltaStreamerHudi datasource

1 支持的并发控制

(1) MVCC

使用单一写入和多个读取保证快照隔离。

(2) 乐观并发

当前处于试验阶段,并且需要ZK或Hive Metastore。

支持文件级别OCC(Optimitic Concurrency Control)。如没有覆盖的多个写可以成功。

2 单写保证

可通过write operations理解Judi datasource和delta streamer的不同保证。

  • Upsert

    目标表不会展示重复

  • Insert

    dedup开启后不会有重复

  • Bulk_insert

    dedup开启后不会有重复

  • Incremental pull

    数据消费和检查点不会乱序

3 多写保证

由于使用了OCC,在去重和顺序保证上有变化。

  • Upsert

    目标表不会展示重复

  • Insert

    即使开启dedup也可能有重复

  • Bulk_insert

    即使开启dedup也可能有重复

  • Incremental pull

    可能由于写作业异步完成导致乱序

4 启用多写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 开启OCC
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<lock-provider-classname>

# 3中不同的锁配置
# 1 ZK
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url
hoodie.write.lock.zookeeper.port
hoodie.write.lock.zookeeper.lock_key
hoodie.write.lock.zookeeper.base_path

# 2 HiveMetastore
# URI在运行时从Hadoop配置中加载
hoodie.write.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider
hoodie.write.lock.hivemetastore.database
hoodie.write.lock.hivemetastore.table

# 3 Amazon DynamoDB
hoodie.write.lock.provider=org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
hoodie.write.lock.dynamodb.table
hoodie.write.lock.dynamodb.partition_key
hoodie.write.lock.dynamodb.region

# 访问AWS所需证书配置,否则使用DefaultAWSCredentialsProviderChain.
# https://hudi.apache.org/cn/docs/concurrency_control/#:~:text=DefaultAWSCredentialsProviderChain.
hoodie.aws.access.key
hoodie.aws.secret.key
hoodie.aws.session.token

5 DataSource Writer

hudi-spark模块提供API将Spark Dataframe转换为hudi表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 示例
inputDF.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option("hoodie.write.lock.zookeeper.url", "zookeeper")
.option("hoodie.write.lock.zookeeper.port", "2181")
.option("hoodie.write.lock.zookeeper.lock_key", "test_table")
.option("hoodie.write.lock.zookeeper.base_path", "/test")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
.option(TABLE_NAME, tableName)
.mode(Overwrite)
.save(basePath)

6 DeltaStreamer

hudi-utilities-bundle模块中的HoodieDeltaStreamer提供了DFS、Kafka等多种来源消费数据的能力。

使用OCC需要向来源配置文件中添加前述配置。

1
2
3
4
5
6
7
8
9
# 使用DeltaStreamer消费Kafka数据
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
--props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--target-base-path file:\/\/\/tmp/hudi-deltastreamer-op \
--target-table uber.impressions \
--op BULK_INSERT

7 使用乐观并发控制的最佳实践

由于网络、并发量和提交操作可能导致获取锁超时,可以通过以下配置设置重试:

1
2
3
4
5
6
7
# 使用原生锁配置,全局配置
hoodie.write.lock.wait_time_ms
hoodie.write.lock.num_retries

# 客户端配置,弥补全局配置不可变动缺点
hoodie.write.lock.client.wait_time_ms
hoodie.write.lock.client.num_retries

8 取消多写

1
2
hoodie.write.concurrency.mode=single_writer
hoodie.cleaner.policy.failed.writes=EAGER

9 警告

如果使用WriteClient多写,不建议复用实例。

参考资料