最新要闻
- 全球动态:一加Ace 2全球首发1.5K灵犀触控屏:120Hz刷新率、1450nit高亮度
- 每日消息!知名玩家爆料魔兽国服已签新代理:暴雪相中上海一公司
- 全球热点!土耳其再发生5.2级地震:中国卫星传回土耳其地震震中图像
- 全麦+黑麦:舌里0脂肪全麦面包14.9元2斤40片大促
- 国产版ChatGPT!百度版已成功注册ERNIE商标:最快3月开放
- 车顶维权女车主:重启战斗模式 希望今年能明确特斯拉案件结果
- 小米首款二合一笔记本!小米笔记本12.4图赏
- 每日速看!陪伴十年被撞报废!男子躲过一劫磕头跪别爱车 网友:它值得
- 环球视点!一加Ace 2普及16GB超大内存:44个应用保活
- 世界观点:首批入手三星Galaxy S23用户开机后大跌眼镜:系统吃掉60GB存储空间
- 上海网红“安福路小公主”接代言引热议:网友感叹辣眼睛的大妈无敌了
- 全球播报:13代CPU+40系GPU 笔记本和台式机区别有多大?
- 微头条丨100%纯棉 班尼路短袖99元4件包邮:13色可选
- 今日热文:ChatGPT概念全球爆火!我国人工智能相关企业超百万家
- 没法出二手“回血”了!索尼称PS VR2初期将仅推出数字版游戏
- 世界动态:89%美国大学生用写作业!揭秘爆红ChatGPT之父:应对核末日狂囤黄金、中学就出柜
手机
iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
- 警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- 男子被关545天申国赔:获赔18万多 驳回精神抚慰金
- 3天内26名本土感染者,辽宁确诊人数已超安徽
- 广西柳州一男子因纠纷杀害三人后自首
- 洱海坠机4名机组人员被批准为烈士 数千干部群众悼念
家电
RocketMQ Streams拓扑构建与数据处理过程
本文作者:倪泽,Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer
01背景
RocketMQ Streams 1.1.0版本已于近期发布,相对之前版本有以下改进和优化:
(资料图)
1、API层面支持泛型,可自定义输入输出数据;
2、去掉冗余逻辑,简化代码,重写拓扑图构建和数据处理过程;
本文章承接上篇:RocketMQ Streams 1.1.0: 轻量级流处理再出发,从实现原理上介绍RocketMQ Streams是如何实现流计算拓扑图构建的以及探讨了数据流转过程和流转过程中的状态变化。
02流处理拓扑构建过程
public class example { public static void main(String[] args) { StreamBuilder builder = new StreamBuilder("wordCount"); builder.source("sourceTopic", total -> { String value = new String(total, StandardCharsets.UTF_8); return new Pair<>(null, value); }) .flatMap((ValueMapperAction>) value -> { String[] splits = value.toLowerCase().split("\W+"); return Arrays.asList(splits); }) .keyBy(value -> value) .count() .toRStream() .print(); TopologyBuilder topologyBuilder = builder.build(); Properties properties = new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties); Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") { @Override public void run() { rocketMQStream.stop(); } }); rocketMQStream.start(); }}
在使用者书写上述及连表达式时,发生第一次构建,即逻辑节点的添加,前后算子具有父子关系,构建后形成逻辑节点,多个逻辑节点形成链表。
逻辑构建结束后,调用StreamBuilder#build()方法进行第二次构建,将逻辑节点中可能包含的多个真实节点添加拓扑,形成处理拓扑图。
经过两次两次构建后,处理拓扑已经完整。但是为了区分不同topic使用不同拓扑处理,需要在数据来临前的重平衡阶段,创建真实数据处理节点,这是第三次构建。
逻辑构建(第一次构建)
逻辑节点本身不包括实际操作,但是可由逻辑节点继续构建出实际节点,一个逻辑节点可能包含一实际节点,也可能包含多个实际节点,例如count逻辑算子不仅仅包含累加这个实际操作,累加前需要对相同key的数据路由到同一计算实例上,因此还需要包含sink、source两个实际节点,但是这些只会在构建实际节点时体现出来,不会在添加逻辑节点阶段体现。
每个逻辑节点都是GraphNode的子类,构建时,将子节点算子加入父节点child集合中,将父节点加入子节点parent集合中。这个构建过程中使用Pipeline均为同一个实例。最终在Pipeline中,形成以root节点为根节点的链表。
添加逻辑节点逻辑:
@Overridepublic GroupedStream map(ValueMapperAction mapperAction) { //1、确定节点名称 //2、实现Supplier类,实现数据处理逻辑 //3、实例化逻辑节点类GraphNode //4、将逻辑节点GraphNode添加到pipeline中形成链表}
可以看到逻辑节点的添加非常通用,实现不同功能的算子,只需要实现算子对应的数据实际处理逻辑即可,如将新增算子形成拓扑图等等后续工作完全不用关心,降低了新算子开发的门槛。
在逻辑节点的构建过程中,有两类比较特殊的算子,一个是实现数据分组的shuffle算子,一个是实现双流聚合的Join算子。
shuffle逻辑算子的功能是将含有相同key的数据发送到同一个队列中,方便后续算子对相同key的数据进行统计。他通常是keyBy后面紧跟的算子,例如keyBy("年级").count(),那么count就是一个shuffle算子类型。shuffle逻辑算子包含三个实际处理过程:
- 将数据按照Key的hash%queueNum发送到对应队列;
- 从RocketMQ中拉取上述数据到本地;
- 按照shuffle节点中定义的逻辑进行处理,例如累加。
Join算子的功能是实现双流聚合,将两个数据流聚合成一个。
Join拓扑图
在左流和右流上添加KeyBy算子,对左流和右流进行分别过滤;之后在左流和右流上分别添加标签节点,标记此数据是左流还是右流,之后将两个标签节点,指向一个共同的Join节点,数据在此完成汇聚,按照使用者给定的ValueJoinAction节点处理。
Join使用方式:
StreamBuilder builder = new StreamBuilder(jobId);RStream leftStream = builder.source(...);RStream rightStream = builder.source(...);ValueJoinAction action = new ValueJoinAction(){...};leftStream.join(rightStream) .where(左流字段) .equalTo(右流字段) .apply(action) .print();
Join实现伪代码:
//左右流按照各自字段分组,含有相同key的字段会被回写到同一个队列里面;GroupedStream leftGroupedStream = leftStream.keyBy(左流字段);//因为后面左右流数据会在一起处理,为了区分数据来源,在数据中添加标记是左流还是右流leftGroupedStream.addGraphNode(addTag);//获取leftGroupedStream最后的逻辑节点GraphNode leftLast = leftGroupedStream.getLast(); GroupedStream rightGroupedStream = leftStream.keyBy(右流字段);rightGroupedStream.addGraphNode(addTag);GraphNode rightLast = rightGroupedStream.getLast();//数据汇聚节点ProcessorNode commChild = new ProcessorNode(name, temp, “聚合数据实际操作”);commChild.addParent(leftLast);commChild.addParent(rightLast);//统一数据流RStreamImpl commRStream = new RStreamImpl<>(Pipeline, commChild);//继续在统一数据流上操作commRStream...
实际构建(第二次构建)
构建逻辑节点完毕后,从ROOT节点开始遍历,调用GraphNode逻辑节点addRealNode方法,构建真实节点工厂类。
在第二次构建实际节点过程中,会对逻辑节点进行拆解,对于大多数逻辑节点,只需要构建一个实际节点,但是对于某些特殊的逻辑节点需要构建多个实际节点才能与之对应,例如shuffle类型逻辑节点,他需要包含三个实际节点:发送数据节点、消费数据节点、处理数据节点。shuffle类型逻辑节点父节点必须是GroupBy,例如上图所示的count是shuffle节点,Window节点也可以是逻辑节点。
第二次构建并不会直接生成处理数据的Processor,而是产生ProcessorFactory对象。为什么不生成直接能处理数据的Processor对象呢?因为一个RocketMQ Streams实例需要同时拉取不同队列进行流计算,为了能将不同队列的流计算过程区别开,针对每个队列会由独立的Processor实例进行处理,因此第二次构建仅仅构建出ProcessorFactory,在重平衡确定流处理实例要拉去哪些队列后,再由ProcessorFactory实例化Processor。
第三次构建
客户程序依赖RocketMQ Streams获得流计算能力,因此客户程序本质上是就是一个RocketMQ Client(见方案架构图)。在RocketMQ Client发生重平衡时,会将RocketMQ Server所包含的队列在客户端中重新分配,第三次构建,也就是由ProcessorFactory实例化Processor,就发生在重平衡发生后,拉取数据前。第三次真实的构建出了处理数据的Processor,并将子节点Processor添加进入父节点Processor中。
03数据处理过程
状态恢复
流处理过程中产生的计算状态保存、恢复涉及到流处理过程的正确性。在流处理实例宕机的情况下,该流处理实例上消费的队列会被重平衡到其他流处理实例上。如果对该队列进行了有状态计算,那么产生的状态也需要在新的流计算实例上恢复。如上图中,Instance1宕机,他消费的MQ2和MQ3被分别迁移到Instance2和Instance3上,MQ2和MQ3对应的状态(紫色和蓝色)也需要在Instance2和Instance3上恢复出来。
- 存储介质
使用本地RocksDB,远程RocketMQ的组合,作为状态存储介质。流计算在计算状态时,RocksDB在使用有限内存情况下,作为状态的临时存储,用于算子交互,在计算结束后提交消费位点时将本次计算产生的状态一并写入RocketMQ中。消费位点提交、计算结果写出、状态保存需要保持原子状态,这一内容在后面流计算正确性中讨论。
- 状态持久化存储
RocketMQ作为消息临时存储,存在数据最大过期时间,一旦过期后,数据会被删除。但是状态存储介质本质上是以KV方式存储数据,不希望KV数据随着时间过期而被删除。因此,使用Compact topic作为状态存储,他会对同一队列的数据按照Key对数据进行压缩,相同Key的数据只保留offset最大的一条。
//key如果决定数据被发送到某个Broker的哪个队列int queueId = hash(key) % queueNum
但是在RocketMQ中队列数会随着Broker扩缩容而增加或者减少,扩缩Broker数量前后,相同的Key可能被发送到不同的队列,那么按照上述规则进行Compact后得到某个key所在的queueId就是错误的,使用Compact topic作为KV存储就失去了意义。
因此在状态topic是Compact topic的基础上,再将状态topic创建为Static topic(Logic Queue),即状态topic即是Compact topic也是Static topic。这样才能解耦队列数量与Broker数量,使队列数量在扩缩Broker情况下仍然不变,保证含有相同Key的数据能被发送到同一队列中。
- 状态重放
从被迁移状态队列拉取数据到本地进行重放,需要从队列头开始消费,相同Key的数据只保留offset最大的数据,形成K-V状态对,放入本地临时存储RocksDB中;
- 状态topic与source topic对应关系
因为状态topic中的队列会随着source topic队列迁移而迁移,保证对source topic队列中数据进行有状态处理得到正确的结果,因此在队列层面,状态topic与source topic应该是一一对应的关系。即状态topic名称与source topic名称一一对应,状态topic的队列数量等于source topic队列数量。source topic队列的流计算状态保存在状态topic的对应队列中。
数据处理
图中黑色线表示控制流,黄色线表示数据流;rebalance部分先于litePull部分被调用。
重平衡部分:
- 根据分配到的队列,到相应状态topic的相应队列中从头拉取数据,到本地重放,获得KV状态对,放入到本地RocksDB中。
- 根据数据源topic,构建对应的数据处理器processor(即第三次构建过程),保存起来;
数据处理部分:
- 使用litePull模式拉取数据,可以独立控制消费位点提交;
- 数据反序列化;
- 使用topic查找processor;
- 将processor的子节点保存起来(子节点在第三次构建过程中添加);
- 数据向上下文StreamContext中传递,由他将数据路由到下游节点;
- 数据处理前,现将下游节点的子节点保存起来供后续查找;
- 数据处理,如果有状态算子则与RocksDB交互,如果还有下游节点则继续进入StreamContext,如果没有下游节点则结束处理。
数据每次到下游节点前,先进入StreamContext中,由它统一向下游节点传递数据。StreamContext中包含了处理数据所需要的所有信息,包括数据来源、状态存储、下游子节点等等;
StreamContext不断递归迭代,将数据向下游传递,最终数据会被拓扑图上所有节点处理,由sink节点写出结果。
04参与贡献
RocketMQ Streams是Apache RocketMQ的子项目,已经在社区开源,并且提出了一些Good First Issue供感兴趣同学参加。参与RocketMQ Streams相关工作,请参考以下资源:
1、试用RocketMQ Streams,并阅读相关文档以了解更多信息;
maven仓库坐标:
org.apache.rocketmq rocketmq-streams 1.1.0
RocketMQ Streams文档:
https://rocketmq.apache.org/zh/docs/streams/01RocketMQ%20Streams%20Overview
2、参与贡献:如果你有任何功能请求或错误报告,请随时提交 Pull Request 来分享你的反馈和想法;
社区仓库:https://github.com/apache/rocketmq-streams
3、联系我们:可以在 GitHub上创建 Issue,向 RocketMQ 邮件列表发送电子邮件,或在 RocketMQ Streams SIG 交流群与专家共同探讨,RocketMQ Streams SIG加入方式:添加“小火箭”微信,回复RocketMQ Streams。
邮件列表:https://lists.apache.org/list.html?dev@rocketmq.apache.org
RocketMQ Streams拓扑构建与数据处理过程
全球动态:一加Ace 2全球首发1.5K灵犀触控屏:120Hz刷新率、1450nit高亮度
每日消息!知名玩家爆料魔兽国服已签新代理:暴雪相中上海一公司
全球热点!土耳其再发生5.2级地震:中国卫星传回土耳其地震震中图像
全麦+黑麦:舌里0脂肪全麦面包14.9元2斤40片大促
国产版ChatGPT!百度版已成功注册ERNIE商标:最快3月开放
强大的word插件:不坑盒子下载安装使用图文讲解
ChatGPT横空出世,虽然会改BUG,但程序员也不用慌
天天简讯:强大的word插件,让工作更高效:不坑盒子 2023版
pandas 用户数据分析2
车顶维权女车主:重启战斗模式 希望今年能明确特斯拉案件结果
小米首款二合一笔记本!小米笔记本12.4图赏
每日速看!陪伴十年被撞报废!男子躲过一劫磕头跪别爱车 网友:它值得
环球视点!一加Ace 2普及16GB超大内存:44个应用保活
世界观点:首批入手三星Galaxy S23用户开机后大跌眼镜:系统吃掉60GB存储空间
【天天速看料】【算法训练营day41】LeetCode343. 整数拆分 LeetCode96. 不同的二叉搜索树
今日观点!亚马逊商城入门教程_编程入门自学教程_菜鸟教程-免费教程分享
天天快资讯:计算机视觉——SSD和YOLO简介
焦点热议:CTF之WEB学习路线规划
当前焦点!第一个编译器
上海网红“安福路小公主”接代言引热议:网友感叹辣眼睛的大妈无敌了
全球播报:13代CPU+40系GPU 笔记本和台式机区别有多大?
微头条丨100%纯棉 班尼路短袖99元4件包邮:13色可选
今日热文:ChatGPT概念全球爆火!我国人工智能相关企业超百万家
没法出二手“回血”了!索尼称PS VR2初期将仅推出数字版游戏
视讯!火山引擎ByteHouse助力中国地震台网中心,快速构建一站式实时数仓
实战分享 | 金融数据采集报送平台实践
观天下!我是如何用CAP和BASE两个基础理论卷死其他组员的?
环球信息:【踩坑记录】@Transactional注解回滚不生效问题
世界微资讯!开心档-软件开发入门之MongoDB 覆盖索引查询
世界动态:89%美国大学生用写作业!揭秘爆红ChatGPT之父:应对核末日狂囤黄金、中学就出柜
当前快报:2023年首次更新!微信Mac版3.7.0发布:终于支持图片文字提取
世界观察:Github、Gitee优秀的开源项目
【ChatGPT 注册】
【天天聚看点】ChatGPT让苹果急了!消息称苹果将于下周召开内部AI峰会
环球快资讯丨299元!小米对讲机2S发布:市区5公里通话 100个对讲频道
热议:4个小号给大号浇水 缴电费再退骗蚂蚁森林能量被封号!法院判了
人工智能立大功!AI筛查阿尔茨海默病准确率达75%
每日视点!19999元!尼康发布尼克尔Z 85mm f/1.2 S镜头:模特发丝根根可数
观天下!从这些爆款小游戏中,你看到了什么?
Docker容器使用 (入门到精通)
HGAME_2023_WEB_WP_WEEK3
2799限量抢!小米Civi 2潮流版预售 网友:我一男生都表示太好看
世界报道:豆瓣仅5.7分 漫威超英大片《黑豹2》国内首映 开场票房就被《流浪地球2》超越
每日速递:男子花两千元套圈 套中两台宏光汽车:本人回应只要一辆 做人留一线
国产纯电两门跑车!哪吒E实车亮相:动力暴躁
焦点!百度类ChatGPT产品“文心一言”公开:股价应声上涨13%
诺基亚800c手机上市价格是多少?诺基亚800c手机参数
breeno指令是干什么用的?breeno指令在哪里?
戴尔笔记本电脑换电池大概多少钱?戴尔笔记本电脑开机黑屏没反应怎么办?
格力空调型号在哪里查看?格力空调型号一览表匹数
美的冰箱质量怎么样?美的冰箱怎么调温度?
天天视讯!Redis的十六种应用场景
全球短讯!一款备受欢迎的用户脚本管理器插件TampermonKey-油猴脚本管理器安装与使用
越狱最后各人物的结局是什么?电视剧越狱演员表
曾志伟的身高是什么?曾志伟个人资料
12月是什么星座?描写12月你好的优美说说
取关是什么意思?取关抖音好友对方知道吗?
期中考试家长意见怎么写?期中考试后的感想作文模板
热点评!19岁中国小伙确诊老年痴呆 全球纪录又被刷新了
全球看点:土耳其大地震 多家中国上市公司称影响有限
ChatGPT教你写AI包教包会!7段对话写出识别程序 准确度最高达99.7%
每日资讯:小米Civi 2潮流限定版来了:后盖上全是粉色Hello Kitty 超萌
世界看热讯:国产ChatGPT何时能出?百度站出来了:最快3月 评分全球第一
[西湖论剑2022]Misc-机你太美
焦点资讯:抖音视频无水印下载器,抖音视频下载器,下载未开放下载权限的视频,支持批量下载,同一作者作品自动批量下载,合集下载,话题挑战视频合集下载等
世界快消息!【2023微博签到爬虫】用python爬上千条m端微博签到数据
带了一个 3 年的开发,不会循环删除 List 中的元素,心态崩了。。
环球通讯!DataX插件二次开发指南
7斤纯铜就能压制酷睿i9 为啥还要散热风扇?原因一个字:贵
新能源起大早赶晚集 吉利失去的四年
全球新消息丨魔兽等游戏国服已停服两周 暴雪高管:寻求替代方式服务中国玩家
全球热点评!法拉利2022年销量破纪录 员工年终奖人均10万
世界新动态:读Java实战(第二版)笔记03_引入和使用流
焦点快看:必知必会的设计原则——开放封闭原则
焦点信息:官方加紧备货!红魔8 Pro系列开售一个多月供不应求:网友反映“秒没”
全球要闻:2.5K触屏骁龙本!小米Book 12.4二合一首销:到手2899元
环球微动态丨2023年来最强雨雪上线:北方下雪、南方暴雨 影响超20省份
单踏板不爽、刹车失灵无妨!特斯拉83%车主为新用户 客户流失低忠诚度最高
【环球速看料】聊天机器人ChatGPT要抢搜索引擎生意 谷歌出手:竞品Bard来了
【天天新要闻】安卓之光来了!曝小米13 Ultra 4月登场:支持120倍变焦
天天时讯:售价超过2万元 苹果MR头显即将推出:搭载全新OS 未来将取代iPhone
【天天新视野】我国多地取消中考男女生长跑 800米对健康不利:专家喊话体育锻炼不能放松
关于pacemaker-Cluster-节点的维护模式的功能介绍
SpringBoot怎么自定义一个Starter
当前快报:做出《新闻联播》片头的齐东旭教授走了:中国CAD与计算机图形学痛失巨匠
环球观焦点:游客放孔明灯被拽下吹灭 景区回应:明文规定禁放禁售
【天天新视野】《分布式技术原理与算法解析》学习笔记Day03
Spring IOC官方文档学习笔记(九)之基于注解的容器配置
每个程序员必学的10个Git命令
打价格战底气十足:数据显示特斯拉单车利润是比亚迪6倍
梦回Win98!196MB内存电脑成功启动Win11:开机时间要30分钟
环球看热讯:node中的优先从缓存中加载模块与模块的加载规则
LeetCode 239 滑动窗口最大值- Python手撕最大堆
环球新资讯:vscode编译java程序出现NoSuchMethodError的解决方法
价格又卷下来了!致态TiPlus 7100固态硬盘新史低:1TB仅589元
【天天快播报】澳大利亚一架波音737灭火飞机坠毁:全员奇迹生还
[JavaScript]实例化对象
天天热消息:【ctf权威竞赛指南笔记】(1)CTF
环球聚焦:全国城市地级市区县sql