基于 Flink 流将数据移动到 Doris。

基于 Flink 流将数据移动到 Doris。

本文介绍 Apache Doris 如何基于Flink Streaming帮助您从 MySQL 等上游数据库向 Doris 导入数据并进行变更数据捕获(CDC)。但首先,您可能会问:Apache Doris 是什么,我为什么要费心这样做?

那么,Apache Doris是一个开源的实时分析数据仓库,同时支持高并发点查询和高吞吐量的复杂分析。提供亚秒级解析查询能力,在多维分析、仪表盘等实时数据服务中大显身手。

概述

概述

  1. 如何秒级完成端到端的数据同步
  2. 如何确保实时数据可见性
  3. 如何让海量小文件的写入更流畅
  4. 如何保证端到端的Exactly-Once处理
实时性

实时性

  1. 流写入
Doris 中的 Flink-Doris Connector 曾经遵循“缓存和批量写入”的数据摄取方法。但是,这需要明智地选择批大小和批写入间隔;否则事情可能会出错。例如,如果批大小太大,则可能会出现OOM 错误。另一方面,频繁的写入可能会导致生成过多的数据版本。
为了避免这样的麻烦,Doris 实现了一个 Stream Write 方法,其工作原理如下:

  1. Flink 任务一旦启动,就会异步发起 Stream Load HTTP 请求。
  2. 数据通过 HTTP 的分块传输编码机制传输到 Doris。
  3. HTTP 请求在 Checkpoint 结束,即 Stream Load 任务完成。同时,异步发起下一个 Stream Load 请求。
  4. 重复以上步骤。

  1. 事务处理

  1. 数据版本快速聚合
小文件的高并发写入会在 Doris 中产生过多的数据版本,导致数据查询变慢。因此,Doris 增强了数据压缩能力,以便快速聚合数据。

首先,Doris 介绍了 Quick Compaction。具体来说,一旦数据版本增加,就会触发数据压缩。同时,Doris 通过扫描 tablets 的元数据,识别出数据版本过多的 tablet,并进行相应的 compaction。

其次,针对高并发、高频率的小文件写入,Doris实现了Cumulative Compaction。它从调度的角度将这些压缩任务与重量级的Base Compaction隔离开来,避免它们之间的相互影响。

最后但同样重要的是,Doris 采用了分层数据聚合的方法,可以确保每次聚合只涉及相似大小的文件。这大大降低了聚合任务的总数和系统的CPU占用率。

恰好一次

恰好一次

Exactly-Once 语义意味着数据将被处理一次且仅一次。即使机器或应用程序出现故障,它也可以防止数据被重新处理或丢失。

Flink 实现了一个 2PC 协议来实现 Sink 算子的 Exactly-Once 语义。基于此,Doris 中的 Flink-Doris Connector 实现了 Stream Load 2PC 来传递 Exactly-Once 处理。详情如下所示:

  1. Flink 任务一旦启动就会发起 Stream Load PreCommit 请求。然后会开启一个事务,通过HTTP的chunked机制,源源不断的向Doris发送数据。

  1. HTTP 请求在 Checkpoint 结束,Stream Load 完成。事务状态将设置为预提交。此时数据已经写入BE,对用户不可见。

  1. Checkpoint 发起请求,并将事务状态更改为 Committed。在此之后,数据将对用户可见。

  1. 在Flink应用失败的情况下,如果之前的事务处于Pre-Committed状态,Checkpoint会发起回滚请求,将事务状态变为Aborted。
Doris在高并发场景下的表现

Doris在高并发场景下的表现

场景描述

场景描述

使用 Flink 从 Kafka 导入数据。在 ETL 之后,使用 Flink-Doris Connector 将实时数据摄取到 Doris 中。

要求

要求

上游数据以每秒 10 万条的高频率写入 Doris。为实现数据实时可见,上下游数据需要在5s左右同步。

Flink 配置

Flink 配置

并发数:20

检查点间隔:5s

以下是 Doris 的做法:

压缩实时性

压缩实时性

结果显示,Doris 设法快速聚合数据,并使平板电脑中的数据版本数保持在 50 个以下。同时,Compaction Score 保持稳定。

CPU使用率

CPU使用率

在优化了小文件的压缩策略后,Doris 将 CPU 占用率降低了 25%。

查询延迟

查询延迟

通过降低 CPU 使用率和数据版本数量,Doris 将数据排列得更加有序,从而实现更低的查询延迟

Doris在低延迟场景下的表现(高阶压测)

Doris在低延迟场景下的表现(高阶压测)

描述

描述

  1. 客户端单BE、单片Stream Load压力测试
  2. 数据实时性<1s

以下是优化前后的压缩分数:

Doris 使用建议

Doris 使用建议

低延迟场景

低延迟场景

对于需要实时数据可见性的场景(比如秒级数据同步),每次摄取的文件通常比较小。因此,建议将cumulative_size_based_promotion_min_size_mbyte默认值 64 减少到 8(以 MB 为单位)。这可以大大提高压实性能。
高并发场景

高并发场景

对于高并发的写入场景,建议通过将Checkpoint间隔增加到5-10s来降低Stream Load的频率。这不仅增加了 Flink 任务的吞吐量,也减少了小文件的产生,从而避免了额外的压缩压力。另外,对于实时性要求不高的场景(比如分钟级数据同步),建议将Checkpoint间隔增加到5-10分钟。这样,Flink-Doris Connector 仍然可以通过 2PC+Checkpoint 机制保证数据的完整性。
结论

结论

Apache Doris 通过其 Stream Write 方式、事务处理能力和数据版本的聚合来实现数据的实时性。这些技术有助于减少内存和 CPU 使用率,从而降低延迟。此外,为了数据的完整性和一致性,Doris 实现了 Stream Load 2PC 以保证所有数据都恰好被处理一次。这就是 Doris 如何促进快速和安全的数据摄取。