打开网易新闻 查看精彩图片

经过几期深入浅出Apache Spark系列直播的热烈讨论,我们收到了众多观众的热情反馈与宝贵问题。

为了让更多未能参与直播的朋友们也能受益,我们精心整理了直播中的“问题现场Q&A”环节,现将精选内容以文字版形式呈现给大家!

Q1:如何看待实时计算领域 Spark Streaming 和 Flink 两条技术路线的现状和未来?

A1:与两者发展的时间阶段有关,Spark 发展较早,Spark SQL 已经因为在批处理和针对 Hive 性能优化上的工作成为主流框架后,Flink 才出现,通过实时处理来跟 Spark 打差异。早期两者在计算模型上存在差异,Flink 认为的实时计算是数据到计算节点之后可以立刻被扔出去;而 Spark Streaming 是在一个时间窗口内(例如:100ms)进行数据累计,然后成批发出去。随着这些年的发展,两者在实时计算上已经很像。首先如果将 Spark Streaming 100ms 的窗口调整到 10ms 效果会跟 Flink 很接近,同时 Spark Streaming 后面也实现了一个和 Flink 很接近的实时计算模型。而 Flink 在生产环境下有被发现背压或者数据处理能力吞吐量不足以支撑高并发的情况,为优化这点 Flink 处理方式上选择了和 Spark 类似的缓存一部分消息。对于两者未来的发展方向,个人感觉,因为 99% 的场景都是批处理或者离线的,真正实时计算场景比较少。Flink 后面也提出了批流一体,未来两者应该会越来越重叠。

Q2:Spark SQL 计算时数据倾斜有什么优化手段?

A2:Spark3.0 AQE 框架支持数据倾斜的优化,分为小分区的数据合并和大分区的数据拆分。

Q3:Spark SQL 中多个 count(distinct)怎么优化?

A3:Spark 通过 RewriteDistinctAggregates 来进行性能优化。

Q4:Structured Streaming 和 Streaming 有啥区别?在流式处理时怎么选择?

A4:只需要选择 structured stream,早期的 Streaming 方式社区已经准备抛弃。

Q5:Spark SQL 能实现类似 Flink SQL 的功能吗?比如只写 SQL 就能实现从 kafka 消费数据,处理入库?

A5:可以,举个例子 360 的 XSQL 项目支持通过 SQL 去操纵 kafka。

Q6:计算小文件多是如何解决的?

A6:有些场景可以去调整 Spark SQL 默认 partition 的数量;如果是与 shuffle 相关的小文件,当前 Spark 版本已经解决了这个问题。

Q7:对于不同的数据源,如 My SQL, ES, Hive 等,即使都可以支持 SQL 也有不同方言,是否应该提出一个通用的计算逻辑表达,屏蔽底层方言?

A7:快手,360,华为,腾讯等公司都有类似的应用,数新网络的 CyberSQL 也有同样功能,通过统一 SQL 减少使用复杂度。

Q8:单个 count(distinct)会优化成 group by, 多条 SQL 语句中多个 count(distinct)怎么避免数据膨胀?

A8:Spark 的 RewriteDistinctAggregates 优化规则,通过 Expand 物理算子,根据条件制造伪的数据列,伪的数据列可以满足不同 DISTINCT 对应的数据值,来减少 Shuffle 次数。

Q9:Spark SQL 默认参数是怎么设的?比如 Executor 数,内存数。

A9:需要结合数据量和任务量具体场景具体评估。

Q10:Spark SQL 怎么实现联邦查询。社区版默认没有实现 JDBC 和 Hive 的 Catalog,这方面能给一些解决方法么?

A10:Spark SQL 默认就支持 Hive;对于 JDBC,社区提供了一个框架里面的功能主要围绕 H2 内存数据库。但 H2 一般不会在生产使用,而实际使用中用户可能需要在生产环境通过 JDBC 接入 MySQL 等数据库,所以社区目前更多的是在通过框架来展示自己的可扩展性,实际生产环境应用还是需要用户自己去进行实现;

Q11:AQE 动态执行的原理是什么,动态修改物理执行计划吗?

A11:AQE 可以在执行阶段插入 Query Stage,预先把小的查询提前进行执行,拿到对应的统计信息,大表的扫描暂缓执行;比如 ABC 三张表进行 join,A 表和 B 表 join 后的数据量,Spark 没有办法提前知道,需要执行后获得数据规模等信息。获取信息后,本身可能是需要 Shuffle 过程的 Sort Merge Join,如果发现 A 和 B join 后的数据量很小,可以优化为 Broadcast Join。也就是说,Spark3.0 之后即便逻辑计划已经转化为物理计划,物理计划也会通过动态的探测调整,继续去做优化。

Q12:在 Spark 执行性能这块,有提出要 Native 执行,能介绍一下这块原理和未来发展么?

A12:Spark 本地化执行是因为 Spark 社区的一些公司提出的,利用了 C 语言在特定操作系统硬件环境下相比 Java 的性能优势来进行优化。早期大家对于向量化的期待比较高,现在看向量化只是 Native 性能优化的一项。向量化相关项目中,目前Databricks 的 Photon 项目并不开源,社区里面的开源项目有 Gluten。因为向量化有性能增益,在降本增效的大背景下值得很多公司去关注。

Q13:RDD 的第五个属性,优先位置要怎么理解?

A13:优先位置是本地化相关的考虑,决定 RDD 跟上游依赖数据产生的 Task 是否在同一个进程/节点/机器上跑,主要解决的是数据的跨网络传输问题,如果是数据在同一个进程本地访问就行了。

Q14:CheckPoint 现在还经常在代码中使用吗?

A14:如果用 Spark Core API 写代码,检查点还是需要的。如果 RDD 编织的 DAG 图某部分已经执行完毕,某部分执行失败,可以从检查点恢复数据,避免重复计算。

Q15:拉数据的时候数据端服务被拉爆了一般是什么状况?

A15:目前比较常见的是,因为 Spark 每个执行器自带的 Shuffle 服务,当上游 Task 执行完毕,有部分 Shuffle 数据落到磁盘里面等待下游其他任务去拉取数据,由 Shuffle 服务提供单独的接口提供访问。同时 Executor 会去执行其他任务。下游拉数据的时候,这部分数据会从磁盘加载到内存里面,再通过 Netty 发出去,这样会占用一部分内存;同时 Netty 通信也需要分配堆外内存;Executor 执行下一个任务也需要占用内存。几点叠加容易导致内存不够 OOM, 在 Yarn 或者 K8s 上跑还会被 kill 掉。对于这点,有一些外部 Shuffle 服务的开源产品,比如 RSS、ESS,通过空间换稳定性,用额外的存储计算资源来保证任务更稳定运行。

系列直播精彩继续,扫码预约

公司简介

浙江数新智能有限公司是一家专注于多云数据智能平台和数据价值流通的服务商。公司总部位于杭州,在上海、北京、深圳等各地设有分支机构,服务网络覆盖全国各区域,客户遍布全球 50+城市。数新智能自成立以来就在人工智能领域进行了深入的探索,已有成熟的产品、基于场景的解决方案及不同行业的成功案例。帮助金融、能源电力等行业相关企业实现数字化、智能化转型,提升企业新质生产力。

数新智能自主研发的一站式多云数据智能平台,主要包括赛博数智引擎CyberEngine、赛博数据平台CyberData、赛博智能平台CyberAI,可提供基于大数据的大模型调优、深度学习、价值流通等多种服务。数新智能自主研发的赛博数智引擎CyberEngine基于开源开放的设计理念,兼容开源引擎并进行深度优化,开放式架构支持主流引擎生态,支持多元异构引擎灵活插拔,支持流批一体、湖仓一体、数智一体等场景化能力。在此基础上,CyberEngine以Spark、Flink作为主计算引擎,以Spark为例,基于Spark实现数新智能的流批引擎、统一查询引擎,在性能、稳定性、云原生化等方面全面优于社区开源版本。

耿嘉安

数新智能高级架构专家 Spark Committer

2014 阿里巴巴御膳房主力开发

2016 软件开发&大数据开发 出版畅销书籍《深入理解 Spark 》

2016 艺龙网大数据架构师 主导开发大数据平台

2017 360 大数据专家 出版畅销书籍《 Spark 内核设计的艺术》

2018 360 高级大数据专家 主导开发 XSQL 查询平台

2020 麒麟高级性能专家 主导 Kylin 执行引擎加速

2024 数新智能高级架构专家