Hudi流式消费

适用于版本0.10.1。

1 DeltaStreamer

能力:

  • 刚好一次消费保证,Kafka、Scoop增量导入、HiveIncrementalPuller输出、DFS文件
  • 支持多种来源记录类型,如json、avro和自定义类型
  • 支持管理检查点、回退和恢复
  • 利用DFS中的avro模式或融合模式注册
  • 支持插件转换

详细能力支持详见命令行帮助:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` --help
Usage: <main class> [options]
Options:
--checkpoint
Resume Delta Streamer from this checkpoint.
--commit-on-errors
Commit even when some records failed to be written
Default: false
--compact-scheduling-minshare
Minshare for compaction as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 0
--compact-scheduling-weight
Scheduling weight for compaction as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 1
--continuous
Delta Streamer runs in continuous mode running source-fetch -> Transform
-> Hudi Write in loop
Default: false
--delta-sync-scheduling-minshare
Minshare for delta sync as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 0
--delta-sync-scheduling-weight
Scheduling weight for delta sync as defined in
https://spark.apache.org/docs/latest/job-scheduling
Default: 1
--disable-compaction
Compaction is enabled for MoR table by default. This flag disables it
Default: false
--enable-hive-sync
Enable syncing to hive
Default: false
--filter-dupes
Should duplicate records from source be dropped/filtered out before
insert/bulk-insert
Default: false
--help, -h

--hoodie-conf
Any configuration that can be set in the properties file (using the CLI
parameter "--propsFilePath") can also be passed command line using this
parameter
Default: []
--max-pending-compactions
Maximum number of outstanding inflight/requested compactions. Delta Sync
will not happen unlessoutstanding compactions is less than this number
Default: 5
--min-sync-interval-seconds
the min sync interval of each sync in continuous mode
Default: 0
--op
Takes one of these values : UPSERT (default), INSERT (use when input is
purely new data/inserts to gain speed)
Default: UPSERT
Possible Values: [UPSERT, INSERT, BULK_INSERT]
--payload-class
subclass of HoodieRecordPayload, that works off a GenericRecord.
Implement your own, if you want to do something other than overwriting
existing value
Default: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
--props
path to properties file on localfs or dfs, with configurations for
hoodie client, schema provider, key generator and data source. For
hoodie client props, sane defaults are used, but recommend use to
provide basic things like metrics endpoints, hive configs etc. For
sources, referto individual classes, for supported properties.
Default: file:///Users/vinoth/bin/hoodie/src/test/resources/delta-streamer-config/dfs-source.properties
--schemaprovider-class
subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach
schemas to input & target table data, built in options:
org.apache.hudi.utilities.schema.FilebasedSchemaProvider.Source (See
org.apache.hudi.utilities.sources.Source) implementation can implement
their own SchemaProvider. For Sources that return Dataset<Row>, the
schema is obtained implicitly. However, this CLI option allows
overriding the schemaprovider returned by Source.
--source-class
Subclass of org.apache.hudi.utilities.sources to read data. Built-in
options: org.apache.hudi.utilities.sources.{JsonDFSSource (default),
AvroDFSSource, AvroKafkaSource, CsvDFSSource, HiveIncrPullSource,
JdbcSource, JsonKafkaSource, ORCDFSSource, ParquetDFSSource,
S3EventsHoodieIncrSource, S3EventsSource, SqlSource}
Default: org.apache.hudi.utilities.sources.JsonDFSSource
--source-limit
Maximum amount of data to read from source. Default: No limit For e.g:
DFS-Source => max bytes to read, Kafka-Source => max events to read
Default: 9223372036854775807
--source-ordering-field
Field within source record to decide how to break ties between records
with same key in input data. Default: 'ts' holding unix timestamp of
record
Default: ts
--spark-master
spark master to use.
Default: local[2]
* --table-type
Type of table. COPY_ON_WRITE (or) MERGE_ON_READ
* --target-base-path
base path for the target hoodie table. (Will be created if did not exist
first time around. If exists, expected to be a hoodie table)
* --target-table
name of the target table in Hive
--transformer-class
subclass of org.apache.hudi.utilities.transform.Transformer. Allows
transforming raw source Dataset to a target Dataset (conforming to
target schema) before writing. Default : Not set. E:g -
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which
allows a SQL query templated to be passed as a transformation function)

Kafka、dfs配置示例详见hudi-utilities/src/test/resources/delta-streamer-config。

示例:

使用Confluent Kafkashi使用avro文件模式注册并产生测试数据。

1
[confluent-5.0.0]$ bin/ksql-datagen schema=../impressions.avro format=avro topic=impressions key=impressionid

消费数据:

1
2
3
4
5
6
7
8
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
--props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--target-base-path file:\/\/\/tmp/hudi-deltastreamer-op \
--target-table uber.impressions \
--op BULK_INSERT

提前迁移表至Hudi,详见迁移指南

(1) 多表DeltaStreamer

HoodieMultiTableDeltaStreamer是包装后的HoodieDeltaStreamer,用于消费数据到多张表中。

当前仅支持顺序消费和COW类型。

与HoodieDeltaStreamer不同的是,需要为不同的表单独配置。

示例配置详见hudi-utilities/src/test/resources/delta-streamer-config

详情参照博客

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  * --config-folder
the path to the folder which contains all the table wise config files
--base-path-prefix
this is added to enable users to create all the hudi datasets for related tables under one path in FS. The datasets are then created under the path - <base_path_prefix>/<database>/<table_to_be_ingested>. However you can override the paths for every table by setting the property hoodie.deltastreamer.ingestion.targetBasePath


hoodie.deltastreamer.ingestion.tablesToBeIngested
comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2
hoodie.deltastreamer.ingestion.targetBasePath
if you wish to ingest a particular table in a separate path, you can mention that path here
hoodie.deltastreamer.ingestion.<database>.<table>.configFile
path to the config file in dedicated config folder which contains table overridden properties for the particular table to be ingested.


[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
--props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \
--config-folder file://tmp/hudi-ingestion-config \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--base-path-prefix file:\/\/\/tmp/hudi-deltastreamer-op \
--target-table uber.impressions \
--op BULK_INSERT

(2) 并发控制

OCC示例,如需要配置kafka-source.properties

1
2
3
4
5
6
7
8
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
--props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--target-base-path file:\/\/\/tmp/hudi-deltastreamer-op \
--target-table uber.impressions \
--op BULK_INSERT

2 检查点

使用检查点记录上次消费的数据位置。Kafka数据源使用Kafka Offset记录,DFS数据源使用最新文件的最近修改时间。数据记录在.hoodie文件中的deltastreamer.checkpoint.key。

修改检查点的方式:

  • –checkpoint

    将修改deltastreamer.checkpoint.reset_key以覆盖检查点

  • –source-limit

    设置读取的最大数据量。DFS数据源单位为字节,Kafka数据源单位为事件数量。

3 模式推断

默认Spark可以从数据源中推断模式。同时也可以通过以下方式显式定义。

(1) 在线注册

可以通过在线注册的方式获取最新模式。

spark-submit配置形如–hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://foo:bar@schemaregistry.org

Config Description Example
hoodie.deltastreamer.schemaprovider.registry.url The schema of the source you are reading from https://foo:[bar@schemaregistry.org](mailto:bar@schemaregistry.org)
hoodie.deltastreamer.schemaprovider.registry.targetUrl The schema of the target you are writing to https://foo:[bar@schemaregistry.org](mailto:bar@schemaregistry.org)

(2) JDBC方式

spark-submit配置形如–hoodie-conf hoodie.deltastreamer.jdbcbasedschemaprovider.connection.url=jdbc:postgresql://localhost/test?user=fred&password=secret

Config Description Example
hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url The JDBC URL to connect to. You can specify source specific connection properties in the URL jdbc:postgresql://localhost/test?user=fred&password=secret
hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type The class name of the JDBC driver to use to connect to this URL org.h2.Driver
hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username username for the connection fred
hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password password for the connection secret
hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable The table with the schema to reference test_database.test1_table or test1_table
hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout The number of seconds the driver will wait for a Statement object to execute to the given number of seconds. Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. It defaults to 0. 0
hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable If true, all columns are nullable true

(3)文件方式

定义.avsc文件

Config Description Example
hoodie.deltastreamer.schemaprovider.source.schema.file The schema of the source you are reading from example schema file
hoodie.deltastreamer.schemaprovider.target.schema.file The schema of the target you are writing to example schema file

(4) Post Processor方式

从以上方式中选择一个用于获取模式,需要实现SchemaPostProcessor

4 来源

(1) DFS

支持以下数据格式:

  • CSV
  • AVRO
  • JSON
  • PARQUET
  • ORC
  • HUDI

(2) Kafka

可以直接从kafka集群读取数据,刚好一次语义、检查点和插件转换详见HoodieDeltaStreamer。

支持以下数据格式:

  • AVRO
  • JSON

(3) S3事件

(4) JDBC

可以整个读取表,或者基于检查点增量读取。

Config Description Example
hoodie.deltastreamer.jdbc.url URL of the JDBC connection jdbc:postgresql://localhost/test
hoodie.deltastreamer.jdbc.user User to use for authentication of the JDBC connection fred
hoodie.deltastreamer.jdbc.password Password to use for authentication of the JDBC connection secret
hoodie.deltastreamer.jdbc.password.file If you prefer to use a password file for the connection
hoodie.deltastreamer.jdbc.driver.class Driver class to use for the JDBC connection
hoodie.deltastreamer.jdbc.table.name my_table
hoodie.deltastreamer.jdbc.table.incr.column.name If run in incremental mode, this field will be used to pull new data incrementally
hoodie.deltastreamer.jdbc.incr.pull Will the JDBC connection perform an incremental pull?
hoodie.deltastreamer.jdbc.extra.options. How you pass extra configurations that would normally by specified as spark.read.option() hoodie.deltastreamer.jdbc.extra.options.fetchSize=100 hoodie.deltastreamer.jdbc.extra.options.upperBound=1 hoodie.deltastreamer.jdbc.extra.options.lowerBound=100
hoodie.deltastreamer.jdbc.storage.level Used to control the persistence level Default = MEMORY_AND_DISK_SER
hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch Boolean which if set true makes an incremental fetch fallback to a full fetch if there is any error in the incremental read FALSE

(5) SQL

SQL数据源主要用于填充指定分区的数据,不会更新deltastreamer.checkpoint.key,而是设置为最近成功提交的检查点。

为了获取并使用最近增量检查点,需要设置hoodie.write.meta.key.prefixes = 'deltastreamer.checkpoint.key'

Spark SQL需要设置hoodie.deltastreamer.source.sql.sql.query = 'select * from source_table'

5 Flink消费

(1) CDC消费

Change Data Capture用于跟踪数据源的变化。推荐使用以下两种方式:

slide1 title

  • 使用flink-cdc-connectors直接连接数据库并同步binlog到Hudi。优点是不需要消息队列,缺点是增加数据库负载。
  • 使用flink cdc format格式从消息队列消费。优点是高可扩展,缺点是依赖消息队列。

注意:

  • 如果上游数据无法保证有序,需要显式设置write.precombine.field
  • 当前MOR表在事件时间序列中无法删除,会导致数据丢失。最好转换为changelog模式changelog.enabled

(2) 批量插入

使用批量插入导入快照数据。

注意:

  • 批量插入忽略了序列化和数据合并,需要自行保证数据唯一性。
  • 在批量执行模式中批量插入更有效率。默认批量执行模式按分区路径排序,避免频繁文件操作导致的写性能下降。
  • 批量插入的并发由write.tasks配置。并发度影响小文件的数量。理论上,批量插入的并发度是bucket的数量,但当bucket写满时,会新增文件,因此文件数量不小于write.bucket_assign.tasks

配置

Option Name Required Default Remarks
write.operation true upsert Setting as bulk_insert to open this function
write.tasks false 4 The parallelism of bulk_insert, the number of files >= write.bucket_assign.tasks
write.bulk_insert.shuffle_by_partition false true Whether to shuffle data according to the partition field before writing. Enabling this option will reduce the number of small files, but there may be a risk of data skew
write.bulk_insert.sort_by_partition false true Whether to sort data according to the partition field before writing. Enabling this option will reduce the number of small files when a write task writes multiple partitions
write.sort.memory false 128 Available managed memory of sort operator. default 128 MB

(3) 索引引导

在导入快照数据后,可以导入增量数据,使用index bootstrap功能去重。

注意:如认为过程耗时,可以在在导入快照数据的同时增加资源以流式写入,减少资源或开启速率限制写入增量数据。

配置

Option Name Required Default Remarks
index.bootstrap.enabled true false When index bootstrap is enabled, the remain records in Hudi table will be loaded into the Flink state at one time
index.partition.regex false * Optimize option. Setting regular expressions to filter partitions. By default, all partitions are loaded into flink state

使用步骤

1) CREATE TABLE创建正确table.type类型的表。

2) 设置index.bootstrap.enabled = true开启索引引导功能。

3) 根据检查点调度时间,在flink-conf.yaml中设置检查点失败上限execution.checkpointing.tolerable-failed-checkpoints = n

4) 第一个检查点成功创建时,索引引导配置成功。

5) 索引引导完成后,用户可以退出并保存检查点(或者直接使用外部检查点)。

6) 重启作业,关闭索引引导。

注意

  • 索引引导阻塞时,检查点不能成功完成。
  • 索引引导由输入数据出发,用户需要保证每个分区中都有数据。
  • 索引引导并行执行,可以通过`finish loading the index under partitionLoad record form file观察。
  • 首个检查点创建成功即成功,不需要在检查点恢复时重新加载索引。

(4) 变化日志模式

Hudi可以保存所有的中间变化消息,然flink状态计算达到近实时数仓ETL管道(增量计算)的目的。Hudi MOR表以行的形式保存消息,支持保存所有变化日志(在数据格式级别整合)。所有的变化日志记录可以使用flink流式读取器消费。

配置

Option Name Required Default Remarks
changelog.enabled false false It is turned off by default, to have the upsert semantics, only the merged messages are ensured to be kept, intermediate changes may be merged. Setting to true to support consumption of all changes

注意

  • 批量(快照)读取依旧会合并中间变化,不论格式是否保存了中间变化日志消息。
  • 当变化日志模式开启时,异步压缩任务会合并变化日志记录到一条记录。如果流式数据源没有及时消费,将只能读取压缩后的记录。可以通过设置压缩选项增加缓冲时间,如compaction.delta_commitscompaction.delta_seconds

(5) 追加模式

INSERT操作,对于COW表默认不会合并小文件,但MOR表会。

COW可开启 write.insert.cluster配置合并小文件。

Option Name Required Default Remarks
write.insert.cluster false false Whether to merge small files while ingesting, for COW table, open the option to enable the small file merging strategy(no deduplication for keys but the throughput will be affected)

(6) 速率限制

Option Name Required Default Remarks
write.rate.limit false 0 Default disable the rate limit

(7) 流式查询

(8) 增量查询

6 Kafka连接Sink

参考资料