最新要闻
- 每日热闻!如何干燥落叶以保持鲜艳的颜色_女孩成人礼父母送什么礼物好呢
- 会计专业特长岗位_会计专业特长_每日消息
- 根治续航焦虑!比亚迪腾势N7续航里程出炉:最远能跑702km
- 三体原声黑胶来了:限量1500份 带独立编码|全球即时
- 淄博鸭货店小哥回应因帅气走红:以为是自己鸭货做的好吃 天天播资讯
- 詹姆斯:我认为我会在攻防两端表现更好,我们要保护主场-天天看点
- 皮耶罗:尤文有些锋无力,不过还好有什琴斯尼 全球观点
- 瑞幸罚员工抄写差评100遍违法吗?网友吵翻|全球实时
- 极速下载!百度网盘超级会员12个月SVIP年卡直减100元:198元好价 焦点资讯
- 天天播报:上海车展已禁止车企送冰淇淋 主办方:属实
- 《孝庄秘史》被曝抄袭作家朴月小说 网友纷纷惊叹:你觉得可惜吗?-焦点精选
- 火影忍者鼬出现的集数-火影忍者鼬出场集数
- 环球热点!广西壮族自治区贺州市2023-04-22 07:55发布雷电黄色预警
- 真我11 Pro+真机照出炉:圆形镜组、白色素皮后盖
- 没对比没伤害!谷歌裁员上万人:CEO年薪却高达15亿
- 【速看料】日本天价芒果再创记录 单颗价格超1.5万人民币!
手机
iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
- 警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- 男子被关545天申国赔:获赔18万多 驳回精神抚慰金
- 3天内26名本土感染者,辽宁确诊人数已超安徽
- 广西柳州一男子因纠纷杀害三人后自首
- 洱海坠机4名机组人员被批准为烈士 数千干部群众悼念
家电
Go中响应式编程库RxGo详细介绍_天天最资讯
最近的项目用到了 RxGo ,因为之前从没有接触过,特意去学了学,特此记录下。文章很多内容是复制了参考资料或者官方文档。如果涉及侵权,请联系删除,谢谢。
1、RxGo简介
1.1 基础介绍
RxGo
是一个基于Go语言的响应式编程库,它提供了一种简单而强大的方式来处理异步事件流和数据流
。RxGo的设计灵感来自于ReactiveX,它提供了类似于ReactiveX的操作符和概念,如Observable、Observer、Subject、Scheduler等。
RxGo的目标是提供一种简单而强大的方式来处理异步事件流和数据流,使得开发人员可以更容易地编写高效、可维护和可扩展的代码。RxGo的特点包括:
(资料图片)
- 响应式编程:
RxGo
提供了Observable和Observer
两个核心概念,使得开发人员可以更容易地处理异步事件流和数据流。 - 操作符:
RxGo
提供了类似于ReactiveX的操作符,如map、filter、reduce等,使得开发人员可以更容易地对事件流进行转换、过滤和聚合等操作。 - 调度器:
RxGo
提供了调度器,使得开发人员可以更容易地控制事件流的执行线程和顺序。 - 可组合性:
RxGo
的操作符具有可组合性,使得开发人员可以更容易地组合多个操作符来实现复杂的操作。 - 高效性:
RxGo
的设计和实现都非常高效,可以处理大量的事件流和数据流。
总之,RxGo
是一个非常强大和实用的响应式编程库,它可以帮助开发人员更容易地处理异步事件流和数据流,提高代码的可维护性和可扩展性。
1.2 RxGo 数据流程图
RxGo的实现基于管道的概念。管道是由通道连接的一系列阶段,其中每个阶段是运行相同功能的一组goroutine。
- 使用
Just
操作符创建一个基于固定列表的静态可观测数据。 - 使用
Map
操作符定义了一个转换函数(把圆形变成方形)。 - 用
Filter
操作符过滤掉黄色方形。
从上面的例子中可以看出来,最终生成的数据被发送到一个通道中,消费者读取数据进行消费。RxGo
中有很多种消费和生成数据的方式,发布结果到通道中只是其中一种方式。
2、快速入门
2.1 安装 RxGo v2
go get -u github.com/reactivex/rxgo/v2
2.2 简单案例
我们先写一个简单的案例,来学习RxGo的简单使用。
package mainimport ( "fmt" "github.com/reactivex/rxgo/v2")func main() { observable := rxgo.Just(1, 2, 3, 4, 5)() ch := observable.Observe() for item := range ch { fmt.Println(item.V) }}
使用 RxGo 的一般流程如下:
- 使用相关的 Operator创建 Observable,Operator就是用来创建 Observable的。
- 中间各个阶段可以使用过滤操作筛选出我们想要的数据,使用转换操作对数据进行转换;
- 调用 Observable的
Observe()
方法,该方法返回一个<- chan rxgo.Item
。然后for range
遍历即可。
结合上面的这张图,我们就比较容易理解RxGo的数据处理流程。因为例子比较简单,没有用到Map、Filter
操作。
执行结果:
$ go run main.go 12345
Just
使用到柯里化的编程思想。柯里化(Currying)是一种函数式编程的技术,它将一个接受多个参数的函数转换成一系列接受单个参数的函数。这些单参数函数可以被组合起来,以便在后续的计算中使用。
柯里化的主要优点是它可以使函数更加灵活和可复用。通过将函数分解为一系列单参数函数,我们可以更容易地组合和重用这些函数,从而减少代码的重复性和冗余性。
例如:
//柯里化的例子func addCurried(x int) func(int) int {return func(y int) int {return x + y}}func main() {add5 := addCurried(5)fmt.Println(add5(10))}
由于 Go 不支持多个可变参数,Just
通过柯里化迂回地实现了这个功能:
//Just creates an Observable with the provided items.func Just(items ...interface{}) func(opts ...Option) Observable { return func(opts ...Option) Observable { return &ObservableImpl{ iterable: newJustIterable(items...)(opts...), } }}
Observe()
返回一个 Item 的chan ,Item的结构如下:
// Item is a wrapper having either a value or an error.typeItem struct {V interface{}E error}
所以通过Just生成observable对象时,传入的数据可以包含错误,在使用时通过 item.Error() 来区分。
func main() { observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)() ch := observable.Observe() for item := range ch { if item.Error() { fmt.Println("error:", item.E) } else { fmt.Println(item.V) } }}
我们使用item.Error()
检查是否出现错误。然后使用item.V
访问数据,item.E
访问错误。
除了使用for range
之外,我们还可以调用 Observable的ForEach()
方法来实现遍历。ForEach()
接受 3 个回调函数:
NextFunc
:类型为func (v interface {})
,传入的数据不包含错误类型时走此函数处理。ErrFunc
:类型为func (err error)
,当传入的数据包含错误时走此函数;CompletedFunc
:类型为func ()
,Observable完成时调用。
有点Promise
那味了。使用ForEach()
,可以将上面的示例改写为:
func main() { observable := rxgo.Just(1, 2, errors.New("这是一个测试错误!"), 4, 5)() <-observable.ForEach(func(v interface{}) { fmt.Println("received:", v) }, func(err error) { fmt.Println("error:", err) }, func() { fmt.Println("completed") })}
$ go run main.go received: 1received: 2error: 这是一个测试错误!received: 4received: 5completed
ForEach()
返回的是一个 chan,用于当 observable 关闭时会向此chan发送数据。所以在 observable
前面加了<-
来阻塞等待 ForEach()
处理完数据。
3、RxGo 深入学习
上面的简单案例,我们是使用Just
来创建observable
。其实还有其他的方式创建observable
。一起来看一看。
3.1 rxgo.Create
传入一个[]rxgo.Producer
的切片,其中rxgo.Producer
的类型为func(ctx context.Context, next chan<- Item)
。我们可以在代码中调用rxgo.Of(value)
生成数据,rxgo.Error(err)
生成错误,然后发送到next
通道中:
package mainimport ("context""errors""fmt""github.com/reactivex/rxgo/v2")func main() {observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {next <- rxgo.Of(1)next <- rxgo.Of("aaa")next <- rxgo.Of(errors.New("test"))}})ch := observable.Observe()for item := range ch {if item.Error() {fmt.Println("err:", item.E)}else {fmt.Println(item.V)}}}
因为rxgo.Create中的参数是[]rxgo.Producer
,所以分成两个rxgo.Producer
也是一样的效果:
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) { next <- rxgo.Of(1) next <- rxgo.Of(2) next <- rxgo.Of(3) next <- rxgo.Error(errors.New("unknown")) }, func(ctx context.Context, next chan<- rxgo.Item) { next <- rxgo.Of(4) next <- rxgo.Of(5)}})
3.2 rxgo.FromChannel
FromChannel
可以直接从一个已存在的<-chan rxgo.Item
对象中创建 Observable:
package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main() {ch := make(chan rxgo.Item)go func() {for i := 0; i < 5; i++ {ch <- rxgo.Of(i)}//需要手动关闭 ch 通道close(ch)}()observable := rxgo.FromChannel(ch)for item := range observable.Observe() {if item.Error() {fmt.Println("err:", item.E)}else {fmt.Println(item.V)}}}
注意:
通道需要手动调用
close()
关闭,上面Create()
方法内部rxgo
自动帮我们执行了这个步骤。
func newCreateIterable(fs []Producer, opts ...Option) Iterable {...go func() {// Create方法内部自动关闭了 next 通道defer close(next)for _, f := range fs {f(ctx, next)}}()...}
3.3 rxgo.Interval
Interval
以传入的时间间隔生成一个无穷的数字序列,从 0 开始:
func main() {observable := rxgo.Interval(rxgo.WithDuration(time.Second))for item := range observable.Observe() {if item.Error() {fmt.Println("err:", item.E)}else {fmt.Println(item.V)}}}
运行后,第一秒输出 0,第二秒输出 1,以此类推。
3.4 rxgo.Range
func main() { observable := rxgo.Range(0, 3) for item := range observable.Observe() { fmt.Println(item.V) }}
Range
可以生成一个范围内的数字:
上面代码依次输出 0,1,2,3。
3.5 Repeat
这个和之前的不太一样,这个是对已经存在的 observable
对象调用 Repeat
方法,从而实现重复生成数据。
package mainimport ("fmt""github.com/reactivex/rxgo/v2""time")func main() {observable := rxgo.Range(0,3).Repeat(2, rxgo.WithDuration(time.Second))for item := range observable.Observe() {if item.Error() {fmt.Println("err:", item.E)}else {fmt.Println(item.V)}}}
输出:
012012012
注意:这里执行的次数一共是3
次,Repeat中的参数是2,重复2次,一共3次。
3.6 rxgo.Start
可以给Start
方法传入[]rxgo.Supplier
作为参数,它可以包含任意数量的rxgo.Supplier
类型。rxgo.Supplier
的底层类型为:
var Supplier func(ctx context.Context) rxgo.Item
Observable内部会依次调用这些rxgo.Supplier
生成rxgo.Item
:
package mainimport ("context""fmt""github.com/reactivex/rxgo/v2""time")func Supplier1(ctx context.Context) rxgo.Item {deadline, ok := ctx.Deadline()fmt.Println("Supplier1", deadline, ok)time.Sleep(time.Second)return rxgo.Of(1)}func Supplier2(ctx context.Context) rxgo.Item {deadline, ok := ctx.Deadline()fmt.Println("Supplier2", deadline, ok)time.Sleep(time.Second)return rxgo.Of(2)}func Supplier3(ctx context.Context) rxgo.Item {deadline, ok := ctx.Deadline()fmt.Println("Supplier3", deadline, ok)time.Sleep(time.Second)return rxgo.Of(3)}func main() {ctx, _ := context.WithTimeout(context.Background(), time.Second*2)observable := rxgo.Start([]rxgo.Supplier{Supplier1, Supplier2, Supplier3}, rxgo.WithContext(ctx))for item := range observable.Observe() {fmt.Println(item.V)}}
4、Observable 分类
根据数据在何处生成,Observable被分为 Hot和 Cold两种类型。
- Hot Observable:热可观测量,数据由可观测量外部产生。
- Cold Observable:冷可观测量,数据由可观测量内部产生。
通常不想一次性的创建所有的数据,使用 热可观测量。
4.1 热可观测量示例
func main() { ch := make(chan rxgo.Item) go func() { for i := 0; i < 3; i++ { ch <- rxgo.Of(i) } close(ch) }() observable := rxgo.FromChannel(ch) for item := range observable.Observe() { fmt.Println(item.V) } for item := range observable.Observe() { fmt.Println(item.V) }}
结果:
012
上面创建的是 Hot Observable。但是有个问题,第一次Observe()
消耗了所有的数据,第二个就没有数据输出了。(可以用可连接的观测量来修改这一行为,后面再说)。
4.2 冷可观测量示例
Cold Observable就不会有这个问题,因为它创建的流是独立于每个观察者的。即每次调用Observe()
都创建一个新的 channel。我们使用Defer()
方法创建 Cold Observable,它的参数与Create()
方法一样。
func main() { observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) { for i := 0; i < 3; i++ { ch <- rxgo.Of(i) } }}) for item := range observable.Observe() { fmt.Println(item.V) } for item := range observable.Observe() { fmt.Println(item.V) }}
Defer源码介绍:
// Defer does not create the Observable until the observer subscribes,// and creates a fresh Observable for each observer.func Defer(f []Producer, opts ...Option) Observable {return &ObservableImpl{iterable: newDeferIterable(f, opts...),}}
执行结果:
$ go run main.go012012
4.3 可连接的 Observable
可连接的(Connectable)Observable对普通的 Observable进行了一层组装。调用它的Observe()
方法时并不会立刻产生数据。使用它,我们可以等所有的观察者都准备就绪了(即调用了Observe()
方法)之后,再调用其Connect()
方法开始生成数据。我们通过两个示例比较使用普通的 Observable和可连接的 Observable有何不同。
4.3.1 普通的Observable,并不是可连接的Observable
func main() { ch := make(chan rxgo.Item) go func() { for i := 1; i <= 3; i++ { ch <- rxgo.Of(i) } close(ch) }() observable := rxgo.FromChannel(ch) observable.DoOnNext(func(i interface{}) { fmt.Printf("First observer: %d\n", i) }) time.Sleep(3 * time.Second) fmt.Println("before subscribe second observer") observable.DoOnNext(func(i interface{}) { fmt.Printf("Second observer: %d\n", i) }) time.Sleep(3 * time.Second)}
上例中我们使用DoOnNext()
方法来注册观察者。由于DoOnNext()
方法是异步执行的,所以为了等待结果输出,在最后增加了一行time.Sleep
。运行结果:
First observer: 1First observer: 2First observer: 3before subscribe second observer
由输出可以看出,注册第一个观察者之后就开始产生数据了。第二个观察者并不会得到数据。
4.3.2 可连接的Observable
通过在创建 Observable的方法中指定rxgo.WithPublishStrategy()
选项就可以创建可连接的 Observable:
- 重点是传入
rxgo.WithPublishStrategy()
func main() { ch := make(chan rxgo.Item) go func() { for i := 1; i <= 3; i++ { ch <- rxgo.Of(i) } close(ch) }() observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy()) observable.DoOnNext(func(i interface{}) { fmt.Printf("First observer: %d\n", i) }) time.Sleep(3 * time.Second) fmt.Println("before subscribe second observer") observable.DoOnNext(func(i interface{}) { fmt.Printf("Second observer: %d\n", i) }) //需要手动调用 observable.Connect 才会产生数据 observable.Connect(context.Background()) time.Sleep(3 * time.Second)}
运行输出:
$ go run main.gobefore subscribe second observerSecond observer: 1First observer: 1First observer: 2First observer: 3Second observer: 2Second observer: 3
上面是等两个观察者都注册之后,并且手动调用了 Observable 的Connect()
方法才产生数据。而且可连接的 Observable有一个特性:它是冷启动的!!!,即每个观察者都会收到一份相同的拷贝。
5、转换 Observable
通过 RxGo 数据流程图
我们知道,我们可以对rxgo.Item
进行转换。rxgo 提供了很多转换函数,下面一起来学一学这些转换函数。
5.1 Map
Map()
方法简单修改它收到的rxgo.Item
然后发送到下一个阶段(转换或过滤)。Map()
接受一个类型为func (context.Context, interface{}) (interface{}, error)
的函数。第二个参数就是rxgo.Item
中的数据,返回转换后的数据。如果出错,则返回错误。
func main() {observable := rxgo.Just(1, 2, 3)()observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {return i.(int), nil}).Map(func(_ context.Context, i interface{}) (interface{}, error) {b := i.(int)if b % 2 == 0 {return nil, errors.New("test")} else {return i, nil}})for item := range observable.Observe() {fmt.Println(item.V)}}
上例中每个数字经过两个Map
,第一个Map
逻辑是原样输出
,第二个Map
逻辑是判断i是不是偶数,如果是偶数,就返回错误,否则原样输出
。运行结果:
1
我们将第一个Map中的语句改为下面的逻辑:
return i.(int) + 1, nil
运行结果:
我们可以知道,数据的处理是串行的,第一个数据执行完所有的Map过后,第二个数据才会执行,当其中某一个执行返回的结果包含错误,就不会继续进行转换了,即不会数据不会进入到 Observe()
中的通道中去。
5.2 Marshal
Marshal
对经过它的数据进行一次Marshal
。这个Marshal
可以是json.Marshal/proto.Marshal
,甚至我们自己写的Marshal
函数。它接受一个类型为func(interface{}) ([]byte, error)
的函数用于对数据进行处理。
type User struct { Name string `json:"name"` Age int `json:"age"`}func main() { observable := rxgo.Just( User{ Name: "dj", Age: 18, }, User{ Name: "jw", Age: 20, }, )() observable = observable.Marshal(json.Marshal) for item := range observable.Observe() { fmt.Println(string(item.V.([]byte))) }}
执行结果:
{"name":"dj","age":18}{"name":"jw","age":20}
由于Marshal
操作返回的是[]byte
类型,我们需要进行类型转换之后再输出。
5.3 Unmarshal
既然有Marshal
,也就有它的相反操作Unmarshal
。Unmarshal
用于将一个[]byte
类型转换为相应的结构体或其他类型。与Marshal
不同,Unmarshal
需要知道转换的目标类型,所以需要提供一个函数用于生成该类型的对象。然后将[]byte
数据Unmarshal
到该对象中。Unmarshal
接受两个参数,参数一是类型为func([]byte, interface{}) error
的函数,参数二是func () interface{}
用于生成实际类型的对象。我们拿上面的例子中生成的 JSON 字符串作为数据,将它们重新Unmarshal
为User
对象:
type User struct { Name string `json:"name"` Age int `json:"age"`}func main() { observable := rxgo.Just( `{"name":"dj","age":18}`, `{"name":"jw","age":20}`, )() observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) { return []byte(i.(string)), nil }).Unmarshal(json.Unmarshal, func() interface{} { return &User{} }) for item := range observable.Observe() { fmt.Println(item.V) }}
由于Unmarshaller
接受[]byte
类型的参数,我们在Unmarshal
之前加了一个Map
用于将string
转为[]byte
。运行结果:
&{dj 18}&{jw 20}
5.4 Buffer
Buffer
按照一定的规则收集接收到的数据,然后一次性发送出去(作为切片),而不是收到一个发送一个。有 3 种类型的Buffer
:
BufferWithCount(n)
:每收到n
个数据发送一次,最后一次可能少于n
个;BufferWithTime(n)
:发送在一个时间间隔n
内收到的数据;BufferWithTimeOrCount(d, n)
:收到n
个数据,或经过d
时间间隔,发送当前收到的数据。
5.4.1 BufferWithCount
func main() {observable := rxgo.Range(0, 5)observable = observable.BufferWithCount(2)for item := range observable.Observe() {fmt.Println(item.V)}}
执行结果:
[0 1][2 3][4]
最后一组只有一个。
5.4.2 BufferWithTime
unc main() {ch := make(chan rxgo.Item, 1)go func() {i := 0for range time.Tick(time.Second) {ch <- rxgo.Of(i)i++}}()observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(2 * time.Second))layout := "2006-01-02 13:04:05"fmt.Println("startTime", time.Now().Format(layout))for item := range observable.Observe() {fmt.Println(item.V)fmt.Println("nextTime", time.Now().Format(layout))}}
执行结果是不确定的,这里需要注意:
startTime 2023-04-22 44:15:49[0]nextTime 2023-04-22 44:15:51[1 2]nextTime 2023-04-22 44:15:53[3 4 5]nextTime 2023-04-22 44:15:55...
5.4.3 BufferWithTimeOrCount
func main() {ch := make(chan rxgo.Item, 1)go func() {i := 0for range time.Tick(time.Second) {ch <- rxgo.Of(i)i++}}()observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(2*time.Second), 2)layout := "2006-01-02 13:04:05"fmt.Println("startTime", time.Now().Format(layout))for item := range observable.Observe() {fmt.Println(item.V)fmt.Println("nextTime", time.Now().Format(layout))}}
执行结果:
startTime 2023-04-22 44:18:48[0]nextTime 2023-04-22 44:18:50[1 2]nextTime 2023-04-22 44:18:51[3 4]nextTime 2023-04-22 44:18:53
BufferWithTimeOrCount
是以BufferWithCount、BufferWithTime
谁先满足条件为准,谁先满足谁就先执行。
5.5 GroupBy
``GroupBy将一个
Observable分成多个
子Observable,每个
子Observable`包含相同的索引值的元素。
GroupBy
函数定义如下:
GroupBy(length int, distribution func(Item) int, opts ...Option) Observable
即将一个Observable
分成length个子Observable
,根据distribution
函数返回的int作为分组的依据。
package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main() {// 创建一个Observable,它发出一些整数值source := rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)()// 使用GroupBy操作符将整数值按照奇偶性进行分组grouped := source.GroupBy(2, func(item rxgo.Item) int {return item.V.(int) % 2}, rxgo.WithBufferedChannel(10))for subObservable := range grouped.Observe() {fmt.Println("new subObservable ------ ")for item := range subObservable.V.(rxgo.Observable).Observe() {fmt.Printf("%v\n", item.V)}}}
上面根据每个数模 3 的余数将整个流分为 3 组。运行:
new subObservable ------ 246810new subObservable ------ 13579
注意rxgo.WithBufferedChannel(10)
的使用,由于我们的数字是连续生成的,依次为 0->1->2->…->9->10。而 Observable默认是惰性的,即由Observe()
驱动。内层的Observe()
在返回一个 0 之后就等待下一个数,但是下一个数 1 不在此 Observable中。所以会陷入死锁。使用rxgo.WithBufferedChannel(10)
,设置它们之间的连接 channel 缓冲区大小为 10,这样即使我们未取出 channel 里面的数字,上游还是能发送数字进来。
6、并行操作
默认情况下,这些转换操作都是串行的,即只有一个 goroutine 负责执行转换函数。从上面的Map
操作也可以得知默认是串行执行的。可以改变这一默认行为,使用rxgo.WithPool(n)
选项设置运行n
个 goroutine,或者rxgo.WitCPUPool()
选项设置运行与逻辑 CPU 数量相等的 goroutine。
package mainimport ("context""fmt""github.com/reactivex/rxgo/v2""math/rand""time")func main() {observable := rxgo.Range(1, 10)observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {time.Sleep(time.Duration(rand.Int31()))return i.(int) + 1, nil}, rxgo.WithCPUPool())for item := range observable.Observe() {fmt.Println(item.V)}}
结果:
891065112473
由于是并行运算,所以结果是不固定的。
我们可以直接看官网的介绍:https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md
7、过滤 Observable
我们可以对Observable中发送过来的数据进行过滤,过滤掉不需要的数据,有以下方式:
Filter
ElementAt
Debounce
Distinct
Skip
Take
下面的内容大多来自官方的示例,地址:https://github.com/ReactiveX/RxGo/tree/v2.5.0/doc
7.1 Filter
Filter()
接受一个类型为func (i interface{}) bool
的参数,通过的数据使用这个函数断言,返回true
的将发送给下一个阶段。否则,丢弃。
package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main() {observable := rxgo.Just(1, 2, 3)().Filter(func(i interface{}) bool {return i != 2})for item := range observable.Observe() {fmt.Println(item.V)}}
结果:
13
7.2 ElementAt
ElementAt()
只发送指定索引的数据,如ElementAt(2)
只发送索引为 2 的数据,即第 3 个数据。
package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main() {observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)for item := range observable.Observe() {fmt.Println(item.V)}}
结果:
2
7.3 Debounce
只有当特定的时间跨度已经过去而没有发出另一个Item
时,才从Observable发出一个Item
。
package mainimport ("fmt""github.com/reactivex/rxgo/v2""time")func main() {ch := make(chan rxgo.Item)go func() {ch <- rxgo.Of(1)time.Sleep(2 * time.Second)ch <- rxgo.Of(2)ch <- rxgo.Of(3)time.Sleep(2 * time.Second)close(ch)}()observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))for item := range observable.Observe() {fmt.Println(item.V)}}
结果:
13
上面示例,先收到 1,然后 2s 内没收到数据,所以发送 1。接着收到了数据 2,由于马上又收到了 3,所以 2 不会发送。收到 3 之后 2s 内没有收到数据,发送了 3。所以最后输出为 1,3。
7.4 Distinct
Distinct()
会记录它发送的所有数据,它不会发送重复的数据。由于数据格式多样,Distinct()
要求我们提供一个函数,根据原数据返回一个唯一标识码(有点类似哈希值)。基于这个标识码去重。
package mainimport ("context""fmt""github.com/reactivex/rxgo/v2")func main() {observable := rxgo.Just(1, 2, 2, 3, 4, 4, 5)().Distinct(func(_ context.Context, i interface{}) (interface{}, error) {return i, nil})for item := range observable.Observe() {fmt.Println(item.V)}}
结果:
12345
7.5 Skip
Skip
可以跳过前若干个数据。
package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main() {observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)for item := range observable.Observe() {fmt.Println(item.V)}}
结果:
345
7.6 Take
Take
只取前若干个数据。
package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main() {observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)for item := range observable.Observe() {fmt.Println(item.V)}}
结果:
12
8、选项
因为golang中不支持默认参数,所以我们经常会用到选项设计模式,rxgo中也大量使用到了此模式。
rxgo.WithBufferedChannel(10)
:设置 channel 的缓存大小;rxgo.WithPool(n)/rxgo.WithCpuPool()
:使用多个 goroutine 执行转换操作;rxgo.WithPublishStrategy()
:使用发布策略,即创建可连接的 Observable。
rxgo还有很多其他选项,具体看官方文档,地址:
https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md
9、简化的真实案例
假设现在有一个定时处理任务,结构如下:
type ScheduledTask struct {RecordId intHandleStartTime time.TimeStatus bool}
在执行具体的任务时,需要去数据库查询下是否已经被取消了,如果已经被取消掉的,则不再执行。
完整代码如下:
package mainimport ("fmt""github.com/reactivex/rxgo/v2""time")type ScheduledTask struct {RecordId intHandleStartTime stringStatus bool}func main() {ch := make(chan rxgo.Item)go producer(ch)time.Sleep(time.Second*3)observable := rxgo.FromChannel(ch)observable = observable.Filter(func(i interface{}) bool {st := i.(*ScheduledTask)return st.Status}, rxgo.WithBufferedChannel(1))// 消费可观测量for customer := range observable.Observe() {st := customer.V.(*ScheduledTask)fmt.Printf("resutl: --> %+v\n", st)}}func producer(ch chan <- rxgo.Item) {for i := 0; i < 10; i++ {status := falseif i % 2 == 0 {status = true}st := &ScheduledTask{RecordId: i,HandleStartTime: time.Now().Format("2006-01-02 13:04:05"),Status: status,}ch <- rxgo.Of(st)} // 这里千万不要忘记了close(ch)}
结果:
resutl: --> &{RecordId:0 HandleStartTime:2023-04-22 46:04:07 Status:true}resutl: --> &{RecordId:2 HandleStartTime:2023-04-22 46:04:10 Status:true}resutl: --> &{RecordId:4 HandleStartTime:2023-04-22 46:04:10 Status:true}resutl: --> &{RecordId:6 HandleStartTime:2023-04-22 46:04:10 Status:true}resutl: --> &{RecordId:8 HandleStartTime:2023-04-22 46:04:10 Status:true}
参考链接
Go 每日一库之 rxgo
[官方例子](
关键词:
-
世界速看:Springboot 多实例负载均衡部署
Springboot多实例负载均衡部署一、测试代码:控制层测试代码:importjava net Inet4Address;importjava net InetAddress;imp
来源: Go中响应式编程库RxGo详细介绍_天天最资讯
CentOS7---基于 CentOS 7 构建 LVS-DR 群集_世界热点
世界速看:Springboot 多实例负载均衡部署
每日热闻!如何干燥落叶以保持鲜艳的颜色_女孩成人礼父母送什么礼物好呢
会计专业特长岗位_会计专业特长_每日消息
根治续航焦虑!比亚迪腾势N7续航里程出炉:最远能跑702km
三体原声黑胶来了:限量1500份 带独立编码|全球即时
淄博鸭货店小哥回应因帅气走红:以为是自己鸭货做的好吃 天天播资讯
天天亮点!语言录制兼容长按跟点击录制
詹姆斯:我认为我会在攻防两端表现更好,我们要保护主场-天天看点
皮耶罗:尤文有些锋无力,不过还好有什琴斯尼 全球观点
瑞幸罚员工抄写差评100遍违法吗?网友吵翻|全球实时
极速下载!百度网盘超级会员12个月SVIP年卡直减100元:198元好价 焦点资讯
天天播报:上海车展已禁止车企送冰淇淋 主办方:属实
《孝庄秘史》被曝抄袭作家朴月小说 网友纷纷惊叹:你觉得可惜吗?-焦点精选
火影忍者鼬出现的集数-火影忍者鼬出场集数
环球热点!广西壮族自治区贺州市2023-04-22 07:55发布雷电黄色预警
真我11 Pro+真机照出炉:圆形镜组、白色素皮后盖
没对比没伤害!谷歌裁员上万人:CEO年薪却高达15亿
【速看料】日本天价芒果再创记录 单颗价格超1.5万人民币!
环球播报:世界读书日丨作家清扬婉兮荐读:《在熟悉的家中向世界道别》
精彩看点:【manim动画教程】--目录(完结)
【全球独家】Rust编程语言入门之模式匹配
滚动:meta标签的一些属性描述
红高粱电视剧主题曲九儿歌词_红高粱主题曲九儿歌词 每日简讯
环球讯息:神速!第200家AITO用户中心开业 余承东:让车主用车更方便、更放心
今日热闻!不止领克08!魅族Flyme Auto生态扩圈:牵手吉利高端品牌极星
国家安全 全民守护!赵全营镇开展全民国家安全日宣传活动
LPR连续8个月“原地踏步” 二季度实体经济融资成本仍有望保持低位
电脑装机兼容性检查
4月份沪牌拍卖结果公布:中标率11%,平均成交价92412元-每日快播
世界快看点丨男子离奇失踪7天 人找到时已失温:科普危害有多大
男子坐飞机和妈祖像邻座:刚开始有点慌 被告知是幸运不用怕
离奇!印度一奶牛被火车撞飞30米砸死铁路边撒尿老人
福岛核污染水排海可能于7月开始实施:海底隧道6月底完工_今日看点
拉萨市绿色工业招商引资推介会在南京举行|当前热闻
最新消息:3239套保障性租赁住房加快建设中哈密市保障性安居工程建设迎新进展
小米AX9000的Docker能做什么
天天观焦点:《操作系统原型--xv6分析与实验》第一章:qemu启动xv6问题记录
HCIP-Datacom-Core 2.1 IS-IS基础实验
每日焦点!vue-admin-template 如何添加快捷导航(标签导航栏)
张歆艺晒和窦骁何超莲合影,被指修图太狠,袁弘穿花衬衫参加晚宴
2.8K高刷屏+天玑9000!vivo Pad2线下提前开售:2799元起 天天头条
斯嘉丽说不会再演黑寡妇了:影迷痛哭 天天新动态
焦点快看:浙江水库大坝端掉罕见一王三后白蚁巢:孕育百万级白蚁集团
樱子小姐的脚下埋着尸体08卷第一章:表与里02
文章学习:基于AVX-512指令集的同态加密算法中大整数运算性能优化与突破
最新消息:Java中常用不可变类
网传深圳二手房参考价取消:有业主连夜涨200万,降价房源比涨价多
险资配置遇压力:资金充沛 优质资产难寻|看热讯
【世界时快讯】多省都要封杀!老头乐事故可构成交通肇事罪 大爷们只能买新能源车、考驾照?
世界通讯!纯净的寓言
天天热讯:性能测试的一些专业概念
环球快消息!昂利康:4月20日召开业绩说明会,投资者参与
责任准备金评估利率或下调 监管部门指导险企储备新产品
使用 Spring Cloud Bus 在微服务之间传递消息示例
北方将现大范围雨雪局地大暴雪:局地冷到破纪录 秋裤快穿回来 全球观点
0秒吸水:亚光加厚纯棉大浴巾23.9元(单条450g)
【全球速看料】学系统集成项目管理工程师(中项)系列03_职业道德规范
【环球时快讯】德宏股份:2022年净利润1107.61万元 同比下降71.54%
【环球财经】英国4月份制造业和服务业复苏分化进一步明显
环球观察:周鸿祎发全员信 确认这类360员工不会被GPT取代!
世界快讯:精研“模考”功能
观察:罗翔在新东方当过三年老师 俞敏洪:讲课幽默风格是被新东方训练的
国内用户无缘!iOS 17将支持三方应用商店或仅限欧洲:苹果准备收费
商品日报(4月21日):负反馈担忧加重黑色系全线下跌 乙二醇逆势涨超2%
快消息!博敏电子:4月21日融券卖出10.9万股,融资融券余额4.75亿元
安徽宁国经开区新型储能项目战略合作签约,预期投资总规模达30亿元
【全球播资讯】AI教你玩游戏
微资讯!厂商疯狂备货!RTX 4060 Ti下月杀到 老黄会良心到卖2800元?
摔角动态外媒爆料蛋妞VS米兹的头号挑战者赛两分钟草草了事的原因
全球热讯:FirewallD入门手册
2023.4.21【图论】点分治|世界新视野
全球视讯!学系统集成项目管理工程师(中项)系列07_信息(文档)管理
小米辣椒的腌制方法放一年都不坏?_新消息
看到洋人吃冰激凌,这样的场景应该珍惜
今日播报!宝马MINI称两名女生不是公司员工 不会再出现:回应求原谅 你还会买吗?
世界焦点!内地票房破2亿!《灌篮高手》登顶淘宝热搜:周边被抢空
三方签约共建华侨大学国家语言服务出口基地 今日报
买到烂尾楼“钱房两空”?最高法明确优先保护购房者权益|每日资讯
世界看点:面对特斯拉掀起的价格战,有“勇士”选择应战,有“逆行者”坚持高端
电脑c盘怎么除了系统其他文删除件_怎样清理c盘除了系统之外的东西
天天快看:张小泉客服回应菜刀拍蒜断两截:比较硬的刀就容易断裂
解决信号盲区!曝小米Civi 3支持5G异网漫游:光明正大“蹭”网 当前关注
环球最资讯丨10倍提升 安卓新旗舰放弃祖传USB2.0:体验变化太明显了
网上的那些喷子 为啥一玩这类游戏就闭嘴了?-全球热点评
热门:i7处理器32GB+1TB仅2999 这款性价比神机只有巴掌大小
潍坊开展“10+”行动 助力工业经济高质量发展 天天新消息
新能源产品集结,长安加速向全球企业转型 当前快播
英国宣布制裁5名俄公民,梅德韦杰夫回应:英国是俄罗斯永远的敌人
华仁物业2022年净利636.57万元,同比减少34.7%|年报
千的组词_关于千的组词
六张iPad绘制的交通卡卡面 是苹果联合创作者为世界地球日送上的绿色礼物_微动态
潭村站_关于潭村站介绍-世界热议
山西一妈妈为催婚给25岁儿子床头摆稻草人 不料小伙儿人间清醒,反告诫勿迷信 环球热门
青春无价 《灌篮高手》首日即破12项纪录!
世界新资讯:马斯克称特斯拉今年将推出全自动驾驶技术
蚌埠:马天奇带队赴长三角地区考察招商 当前速读
黄辣丁和乌鱼能一起养吗 黄辣丁和乌鱼能不能养在一起呢 环球关注
郑州公积金可以取现吗?提取的条件有哪些?-全球简讯