最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

当前动态:ETL的架构设计和实现及其优势

来源:博客园
目录
  • ETL的架构
    • ETL架构的优势
    • 离线 ETL 的架构设计
    • 离线 ETL 的模块实现
      • 数据分片(Split)
      • 数据解析清洗(Read)
      • 多文件落地(Write)
      • 检测数据消费完整性 (Commit)
  • 参考链接

ETL的架构

ETL架构的优势

ETL相对于EL-T架构可以实现更为复杂的数据转化逻辑ETL采用单独的硬件服务器,可以分担数据库系统的负载ETL与底层的数据库数据存储无关,可以保持所有的数据始终在数据库当中,避免数据的加载和导出,从而保证效率,提高系统的可监控性。ELT主要通过数据库引擎来实现系统的可扩展性(尤其是当数据加工过程在晚上时,可以充分利用数据库引擎的资源)ELT可以根据数据的分布情况进行并行处理优化,并可以利用数据库的固有功能优化磁盘I/O。ELT的可扩展性取决于数据库引擎和其硬件服务器的可扩展性。通过对相关数据库进行性能调优,ETL过程获得3到4倍的效率提升一般不是特别困难。

离线 ETL 的架构设计

离线 ETL 采用 MapReduce 框架处理清洗不同业务的数据,主要是采用了分而治之的思想,能够水平扩展数据清洗的能力;

graph LR 1[Input] --> 2[Map] --> 3[Output]

如上图所示,离线 ETL 分为三个模块:


(资料图片仅供参考)

  • Input(InputFormat):主要对数据来源(Kafka 数据)进行解析分片,按照一定策略分配到不同的 Map 进程处理;创建 RecordReader,用于对分片数据读取解析,生成 key-value 传送给下游处理。
  • Map(Mapper):对 key-value 数据进行加工处理。
  • Output (OutputFormat):创建 RecordWriter 将处理过的 key-value 数据按照库、表、分区落地;最后在 commit 阶段检测消息处理的完整性。

离线 ETL 的模块实现

数据分片(Split)

我们从 kafka 获取当前 topic&partition 最大的 offset 以及上次消费的截止 offset ,组成本次要消费的[beginOffset、endOffset]kafkaEvent,kafkaEvent 会打散到各个 Mapper 进行处理,最终这些 offset 信息持久化到 mysql 表中。

那么如何保证数据不倾斜呢?首先通过配置自定义 mapper 个数,并创建对应个数的 ETLSplit。由于 kafkaEevent 包含了单个 topic&partition 之前消费的 Offset 以及将要消费的最大 Offset,即可获得每个 kafkaEvent 需要消费的消息总量。最后遍历所有的 kafkaEevent,将当前 kafkaEevent 加入当前最小的 ETLSplit(通过比较需要消费的数据量总和,即可得出),通过这样生成的 ETLSplit 能尽量保证数据均衡。

数据解析清洗(Read)

如上图所示,首先每个分片会有对应的 RecordReader 去解析,RecordReade 内包含多个 KafkaConsumerReader ,就是对每个 KafkaEevent 进行消费。每个 KafkaEevent 会对应一个 KafkaConsumer,拉取了字节数据消息之后需要对此进行 decode 反序列化,此时就涉及到 MessageDecoder 的结构。MessageDecoder 目前支持三种格式:

格式涉及 topic
Avroandroid、ios、ad_sdk_android...
Jsonapp-server-meipai、anti-spam...
DelimiterTextapp-server-youyan、app-server-youyan-im...

MessageDecoder 接收到 Kafka 的 key 和 value 时会对它们进行反序列化,最后生成 ETLKey 和 ETLValue。同时 MessageDecoder 内包含了 Injector,它主要做了如下事情:

  • 注入 Aid:针对 arachnia agent 采集的日志数据,解析 KafkaKey 注入日志唯一标识 Aid;
  • 注入 GeoIP 信息:根据 GeoIP 解析 ip 信息注入地理信息(如 country_id、province_id、city_id);
  • 注入 SdkDeviceInfo: 本身实时流 ETL 会做注入 gid、is_app_new 等信息,但是离线 ETL 检测这些信息是否完整,做进一步保障。

过程中还有涉及到 DebugFilter,它将 SDK 调试设备的日志过滤,不落地到 HDFS。

多文件落地(Write)

由于 MapReduce 本身的 RecordWriter 不支持单个落地多个文件,需要特殊处理,并且 HDFS 文件是不支持多个进程(线程)writer、append,于是我们将KafkaKey+ 业务分区+ 时间分区 + Kafka partition定义一个唯一的文件,每个文件都是会到带上 kafka partition 信息。同时对每个文件创建一个RecordWriter。

每个 RecordWriter 包含多个 Writer ,每个 Writer 对应一个文件,这样可以避免同一个文件多线程读写。目前是通过 guava cache 维护 writer 的数量,如果 writer 太多或者太长时间没有写访问就会触发 close 动作,待下批有对应目录的 kafka 消息在创建 writer 进行 append 操作。这样我们可以做到在同一个 map 内对多个文件进行写入追加。

检测数据消费完整性 (Commit)

MapReduce Counter 为提供我们一个窗口,观察统计 MapReduce job 运行期的各种细节数据。并且它自带了许多默认 Counter,可以检测数据是否完整消费:

reader_records: 解析成功的消息条数;

decode_records_error: 解析失败的消息条数;

writer_records: 写入成功的消息条数;

...

最后通过本次要消费 topic offset 数量、reader_records 以及 writer_records 数量是否一致,来确认消息消费是否完整。

允许一定比例的脏数据,若超出限度会生成短信告警

参考链接

https://blog.csdn.net/javastart/article/details/113838240

美图离线ETL实践 - 掘金 (juejin.cn)

关键词: 可扩展性 反序列化