作为其数据湖框架,Apache,Hudi,与,Apache,4.Flink,集成

日期:2020-10-23 16:19:12 来源:互联网 编辑:小狐 阅读人数:833

简介:纵观大数据领域成熟、活跃、有生命力的框架,无一不是设计优雅,能与其他框架相互融合,彼此借力,各专所长。

Apache Hudi 是由 Uber 并开源的数据湖框架,它于 2019 年 1 月进入 Apache 孵化器孵化,次年 5 月份顺利毕业晋升为 Apache 顶级项目。是当前最为热门的数据湖框架之一。

1. 为何要解耦

Hudi 自诞生至今一直使用 Spark 作为其数据处理引擎。如果用户想使用 Hudi 作为其数据湖框架,就必须在其平台技术栈中引入 Spark。放在几年前,使用 Spark 作为大数据处理引擎可以说是很平常甚至是理所当然的事。因为 Spark 既可以进行批处理也可以使用微批模拟流,流批一体,一套引擎解决流、批问题。然而,近年来,随着大数据技术的发展,同为大数据处理引擎的 Flink 逐渐进入人们的视野,并在计算引擎领域获占据了一定的市场,大数据处理引擎不再是一家独大。在大数据技术社区、论坛等领地,Hudi 是否支持使用 Flink 计算引擎的的声音开始逐渐出现,并日渐频繁。所以使 Hudi 支持 Flink 引擎是个有价值的事情,而集成 Flink 引擎的前提是 Hudi 与 Spark 解耦。

2. 解耦难点

Hudi 内部使用 Spark API 像我们平时使用 List 一样稀松平常。自从数据源读取数据,到最终写出数据到表,无处不是使用 Spark RDD 作为主要数据结构,甚至连普通的工具类,都使用 Spark API 实现,可以说 Hudi 就是用 Spark 实现的一个通用数据湖框架,它与 Spark 的绑定可谓是深入骨髓。

此外,此次解耦后集成的首要引擎是 Flink。而 Flink 与 Spark 在核心抽象上差异很大。Spark 认为数据是有界的,其核心抽象是一个有限的数据集合。而 Flink 则认为数据的本质是流,其核心抽象 DataStream 中包含的是各种对数据的操作。同时,Hudi 内部还存在多处同时操作多个 RDD,以及将一个 RDD 的处理结果与另一个 RDD 联合处理的情况,这种抽象上的区别以及实现时对于中间结果的复用,使得 Hudi 在解耦抽象上难以使用统一的 API 同时操作 RDD 和 DataStream。

3. 解耦思路

解耦原则:

1统一泛型。Spark API 用到的 JavaRDD,JavaRDD,JavaRDD 统一使用泛型 I,K,O 代替。

2去 Spark 化。抽象层所有 API 必须与 Spark 无关。涉及到具体操作难以在抽象层实现的,改写为抽象方法,引入 Spark 子类实现。

例如:Hudi 内部多处使用到了 JavaSparkContextmap 方法,去 Spark 化,则需要将 JavaSparkContext 隐藏,针对该问题我们引入了 HoodieEngineContextmap 方法,该方法会屏蔽 map 的具体实现细节,从而在抽象成实现去 Spark 化。

3抽象层尽量减少改动,保证 Hudi 原版功能和性能。

4.Flink 集成设计

Hudi 的写操作在本质上是批处理,DeltaStreamer 的连续模式是通过循环进行批处理实现的。为使用统一 API,Hudi 集成 Flink 时选择攒一批数据后再进行处理,最后统一进行提交(这里 Flink 我们使用 List 来攒批数据)

攒批操作最容易想到的是通过使用时间窗口来实现,然而,使用窗口,在某个窗口没有数据流入时,将没有输出数据,Sink 端难以判断同一批数据是否已经处理完。因此我们使用 Flink 的检查点机制来攒批,每两个 Barrier 之间的数据为一次,当某个子任务中没有数据时,mock 结果数据凑数。这样在 Sink 端,当每个子任务都有结果数据下发时即可认为一批数据已经处理完成,可以执行 commit。

DAG 如下:

作为其数据湖框架,Apache,Hudi,与,Apache,4.Flink,集成(图1)

source 接收 Kafka 数据,转换成 List。

InstantGeneratorOperator 生成全局唯一的 instant.当上一个 instant 未完成或者当前批次无数据时,不创建新的 instant。

KeyBy partitionPath 根据 partitionPath 分区,避免多个子任务写同一个分区。

WriteProcessOperator 执行写操作,当当前分区无数据时,向下游发送空的结果数据凑数。

CommitSink 接收上游任务的计算结果,当收到 parallelism 个结果时,认为上游子任务全部执行完成,执行 commit.

注:InstantGeneratorOperator 和 WriteProcessOperator 均为自定义的 Flink 算子,InstantGeneratorOperator 会在其内部阻塞检查上一个 instant 的状态,保证全局只有一个 inflight(或 requested)状态的 instant.WriteProcessOperator 是实际执行写操作的地方,其写操作在 checkpoint 时触发。

5. 实现示例

1 HoodieTable

/**

* Abstract implementation of a HoodieTable.

* @param Sub type of HoodieRecordPayload

* @param Type of inputs

* @param Type of keys

* @param Type of outputs

*/

public abstract class HoodieTableimplements Serializable

protected final HoodieWriteConfig config。

protected final HoodieTableMetaClient metaClient。

protected final HoodieIndexindex。

public abstract HoodieWriteMetadataupsertHoodieEngineContext context, String instantTime。

I records

public abstract HoodieWriteMetadatainsertHoodieEngineContext context, String instantTime。

I records

public abstract HoodieWriteMetadatabulkInsertHoodieEngineContext context, String instantTime。

I records, Option> bulkInsertPartitioner

HoodieTable 是 Hudi 的核心抽象之一,其中定义了表支持的 insert,upsert,bulkInsert 等操作。以 upsert 为例,输入数据由原先的 JavaRDD inputRdds 换成了 I records, 运行时 JavaSparkContext jsc 换成了 HoodieEngineContext context.

从类注释可以看到 T,I,K,O 分别代表了 Hudi 操作的负载数据类型、输入数据类型、主键类型以及输出数据类型。这些泛型将贯穿整个抽象层。

2 HoodieEngineContext

/**

* Base class contains the context information needed by the engine at runtime. It will be extended by different

* engine implementation if needed.

*/

public abstract class HoodieEngineContext

public abstract Listmap(Listdata, SerializableFunctionfunc, int parallelism)

public abstract ListflatMap(Listdata, SerializableFunction> func, int parallelism)

public abstract void foreach(Listdata, SerializableConsumerconsumer, int parallelism)

HoodieEngineContext 扮演了 JavaSparkContext 的角色,它不仅能所有 JavaSparkContext 能的信息,还封装了 map,flatMap,foreach 等诸多方法,隐藏了 JavaSparkContextmap,JavaSparkContextflatMap,JavaSparkContextforeach 等方法的具体实现。

以 map 方法为例,在 Spark 的实现类 HoodieSparkEngineContext 中,map 方法如下:

@Override

public Listmap(Listdata, SerializableFunctionfunc, int parallelism)

return javaSparkContext.parallelize(data, parallelism).map(func:apply).collect。

在操作 List 的引擎中其实现可以为(不同方法需注意线程安全问题,慎用 parallel)

@Override

public Listmap(Listdata, SerializableFunctionfunc, int parallelism)

return data.stream.parallel.map(func:apply).collect(Collectors.toList)

注:map 函数中抛出的异常,可以通过包装 SerializableFunction func 解决.

这里简要介绍下 SerializableFunction:

@FunctionalInterface

public interface SerializableFunctionextends Serializable

O apply(I v1) throws Exception。

该方法实际上是 java.util.function.Function 的变种,与java.util.function.Function 不同的是 SerializableFunction 可以序列化,可以抛异常。引入该函数是因为 JavaSparkContextmap 函数能接收的入参必须可序列,同时在hudi的逻辑中,有多处需要抛异常,而在 Lambda 表达式中进行 try catch 代码会略显臃肿,不太优雅。

6.现状和后续计划

6.1 工作时间轴

2020 年 10 月 2 日,HUDI-1089 合并入 Hudi 主分支,标志着 Hudi-Spark 解耦完成。

6.2 后续计划

1推进 Hudi 和 Flink 集成

将 Flink 与 Hudi 的集成尽快推向社区,初期该特性可能只支持 Kafka 数据源。

2性能优化

为保证 Hudi-Spark 版本的稳定性和性能,此次解耦没有太多考虑 Flink 版本可能存在的性能问题。

3类 flink-connector-hudi 第三方包

将 Hudi-Flink 的绑定做成第三方包,用户可以在 Flink 应用中以编码方式读取任意数据源,通过这个第三方包写入 Hudi。

本文相关词条概念解析:

数据

数据就是数值,也就是我们通过观察、实验或计算得出的结果。数据有很多种,最简单的就是数字。数据也可以是文字、图像、声音等。数据可以用于科学研究、设计、查证等。数据背景是接收者针对特定数据的信息准备,即当接收者了解物理符号序列的规律,并知道每个符号和符号组合的指向性目标或含义时,便可以获得一组数据所载荷的信息。数据作为信息的载体,当然要分析数据中包含的主要信息,及分析数据的主要特征。数据(Data)是载荷或记录信息的按一定规则排列组合的物理符号。

网友评论
相关文章
在跨行业边缘处的用例与架构

在跨行业边缘处的用例与架构

在跨行业边缘处的用例与架构[详情]

数据开发是数据湖需要解决的三大问题,数据开发产品

数据开发是数据湖需要解决的三大问题,数据开发产品

数据开发是数据湖需要解决的三大问题,数据开发产品[详情]

读取模式是错误的,这些计算引擎操作起来很复杂,这些数据管道过滤

读取模式是错误的,这些计算引擎操作起来很复杂,这些数据管道过滤

读取模式是错误的,这些计算引擎操作起来很复杂,这些数据管道过滤[详情]

网站地图    Copyright     2016-2018  资讯网   All rights reserved.