ReactiveX简称为Rx,是基于事件流的一个异步编程API,其中事件流被称为Observable,事件流需要被Observer订阅才有意义。

Rx有多种编程语言实现,RxJava/RxJS/Rx.NET/RxClojure/RxSwift。其中RxGo是Rx的Go语言实现。

下面就来介绍下RxGo中常用的算子使用样例。

Observable & Observe

RxGO中事件流被抽象成ObservableObservable中的事件需要被订阅之后才能被消费,使用Observe进行订阅。
下面是段创建事件流和订阅的代码段:

1
2
3
4
observable := rxgo.Just("Hello, World!")()
ch := observable.Observe()
item := <-ch
fmt.Println(item.V)

Transforming算子

Buffer

根据某个规则将Observable中的事件缓存下来然后一起发送出去。这个规则可以是根据事件的条数(BufferWithCount)或者根据时间间隔(BufferWithTime),当然也可以根据时间间隔和事件条数(BufferWithTimeOrCount)。

下面的样例是根据事件条数的缓存规则进行发送事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)

func main() {
// Create the producer
ch := make(chan rxgo.Item, 1)
go func() {
i := 0
for range time.Tick(time.Second) {
ch <- rxgo.Of(i)
i++
}
}()

observable := rxgo.FromChannel(ch).BufferWithCount(3)

for item := range observable.Observe() {
fmt.Println(item.V)
}
}

每秒向channel中发送一个数字,Observable缓存3个,然后发出,输出结果如下:

1
2
3
4
[0 1 2]
[3 4 5]
[6 7 8]
[9 10 11]

观察输出的内容可以看出item.V是一个[]interface{},如果不使用Buffer算子的话,item.V是一个interface{},所以Buffer算子其后再接其他算子的时候需要注意其事件是一个Slice,而不再是一个普通的interface{}

FlatMap

FlatMap是将Observable中的事件解析成一个Observable,代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"github.com/reactivex/rxgo/v2"
"strings"
)

func main() {
observable := rxgo.Just("Hello, World!", "World", "Golang World")().FlatMap(func(item rxgo.Item) rxgo.Observable {
obs := rxgo.Just(strings.Split(item.V.(string), " "))()
return obs
})

for item := range observable.Observe() {
fmt.Println(item.V)
}
}

FlatMap将每个事件根据" "进行拆分成一个slice,并将其封装成一个Observable发送出去。其输出内容如下:

1
2
3
4
5
Hello,
World!
World
Golang
World

Map

Map是将Observable中的一个事件封装成另一个事件,发送出去。使用方式较为简单,其示例代码如下:

1
2
3
4
observable := rxgo.Just(1, 2, 3)().
Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i.(int) * 10, nil
})

输出如下内容:

1
2
3
10
20
30

Scan

Observable中的一个事件apply一个function,并将其处理的结果发送出去,其使用方式类似Reduce算子,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"context"
"fmt"
"github.com/reactivex/rxgo/v2"
)

func main() {

observable := rxgo.Just("a", "b", "c", "d")().
Scan(func(ctx context.Context, i interface{}, i2 interface{}) (interface{}, error) {
if i == nil {
return i2, nil
}
return fmt.Sprintf("%s,%s", i.(string), i2.(string)), nil
})

for item := range observable.Observe() {
fmt.Println(item.V)
}
}

示例是将目前发送过的事件进行拼接,然后进行发送。其输出结果如下:

1
2
3
4
a
a,b
a,b,c
a,b,c,d

GroupByDynamic

根据规则将Observable中的事件进行分组(分组个数没有限制),然后将其分组结果封装成GroupedObservables发送出去。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"github.com/reactivex/rxgo/v2"
"strconv"
"time"
)

func main() {

observable := rxgo.Range(0, 24).GroupByDynamic(func(item rxgo.Item) string {
return strconv.Itoa(item.V.(int) % 2)
}, rxgo.WithBufferedChannel(12))

for i := range observable.Observe() {
groupedObservable := i.V.(rxgo.GroupedObservable)
key := groupedObservable.Key
fmt.Printf("New observable: %s\n", key)

for i := range groupedObservable.Observe() {
fmt.Printf("key: %s, item: %v\n", key, i.V)
}
}
}

GroupByDynamic中的func定义了分组的规则strconv.Itoa(item.V.(int) % 2),也就是说分组的key就是strconv.Itoa(item.V.(int) % 2)Observe之后从channel中接收GroupedObservable,然后再进行数据获取。其输出结果如下:

1
2
3
4
5
6
7
8
New observable: 0
key: 0, item: 0
...
key: 0, item: 22
New observable: 1
key: 1, item: 1
...
key: 1, item: 23

不过这里有个buff需要注意下,GroupByDynamic中传入了rxgo.WithBufferedChannel(12),这个buff是用来做什么的呢?长度又是怎么定义的呢?

通过源码可得知这个buff是一个channel,是用来存储分组中的元素的。在上面的例子中如果buff的长度设置的小于12,其程序会被block住。之所以会被block住是因为当前程序从Observe中订阅数据是串行的,只消费了0号分组中buff中的数据,而1号分组中buff的数据并没有消费,其channel已填满数据,无法再写入数据,所以程序被block住了。

所以这个长度之所以是12,是因为24个数,分2组,每组12个,buff的长度又正好是12个,串行消费数据并不会造成channel阻塞。

但是实际使用过程中并不能准备评估每组的个数,那这个buff又该怎么设置呢?

解决方法是协程来订阅GroupedObservable,调整后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)

func main() {

observable := rxgo.Range(0, 24).GroupByDynamic(func(item rxgo.Item) string {
if item.V.(int) <= 5 {
return "1"
} else if item.V.(int) <= 11 {
return "2"
} else if item.V.(int) <= 17 {
return "1"
} else if item.V.(int) <= 23 {
return "2"
} else {
return "3"
}
}, rxgo.WithBufferedChannel(2))

for i := range observable.Observe() {
go func() {
groupedObservable := i.V.(rxgo.GroupedObservable)
key := groupedObservable.Key
fmt.Printf("New observable: %s\n", key)

for i := range groupedObservable.Observe() {
fmt.Printf("key: %s, item: %v\n", key, i.V)
}
}()

}

time.Sleep(1 * time.Hour)
}

使用协程来订阅GroupedObservable,每个分组中的数据都能被及时消费掉,并不会造成buff阻塞,所以这里buff只是设置成rxgo.WithBufferedChannel(2)

Window

Window算子较为简单,根据事件个数或者时间间隔将窗口中的事件发送到一个Observable中。其中Window算子与Buffer算子效果上类似,只是Window是将窗口中的事件封装成一个Observable,而Buffer算子是将缓存中的事件当成一个新的事件发出。

Window算子示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"github.com/reactivex/rxgo/v2"
)

func main() {
observe := rxgo.Just(1, 2, 3, 4, 5)().WindowWithCount(2).Observe()

for item := range observe {
fmt.Println("new obs...")
for i := range item.V.(rxgo.Observable).Observe() {
fmt.Println(i.V)
}
}
}

输出的结果如下:

1
2
3
4
5
6
7
8
new obs...
1
2
new obs...
3
4
new obs...
5

Filtering算子

Filter

Filter算子用于过滤符合条件的事件,使用方式较为简单,其代码如下:

1
2
3
4
observable := rxgo.Just(1, 2, 3)().
Filter(func(i interface{}) bool {
return i != 2
})

Combining算子

Join

Join是将两个Observable中规定时间间隔中的事件进行笛卡尔积处理,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"context"
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)

func main() {

observable := rxgo.Just(
map[string]int64{"tt": 1, "V": 1},
map[string]int64{"tt": 4, "V": 2},
map[string]int64{"tt": 7, "V": 3},
)().Join(func(ctx context.Context, l interface{}, r interface{}) (interface{}, error) {
return map[string]interface{}{
"l": l,
"r": r,
}, nil
}, rxgo.Just(
map[string]int64{"tt": 2, "V": 5},
map[string]int64{"tt": 3, "V": 6},
map[string]int64{"tt": 5, "V": 7},
)(), func(i interface{}) time.Time {
return time.Unix(0, i.(map[string]int64)["tt"]*1000000)
}, rxgo.WithDuration(2*time.Millisecond))

for item := range observable.Observe() {
fmt.Println(fmt.Sprintf("item:%v", item.V))
}
}

Join算子是将固定时间间隔中的事件进行处理,这个间隔是由rxgo.WithDuration(2*time.Millisecond)控制的,时间获取方式是又第三个参数func(i interface{}) time.Time方法决定的。

两条流中的事件是否进行join,是事件根据func(i interface{}) time.Time获取时间,两者时间的差值是否在rxgo.WithDuration(2*time.Millisecond)中,如果再则join。

上例中左流中的"tt": 1与右流中的"tt": 2"tt": 3进行join,不与"tt": 5进行join是因为5-1=4>rxgo.WithDuration(2*time.Millisecond)

其输出结果如下:

1
2
3
4
5
6
item:map[l:map[V:1 tt:1] r:map[V:5 tt:2]]
item:map[l:map[V:1 tt:1] r:map[V:6 tt:3]]
item:map[l:map[V:2 tt:4] r:map[V:5 tt:2]]
item:map[l:map[V:2 tt:4] r:map[V:6 tt:3]]
item:map[l:map[V:2 tt:4] r:map[V:7 tt:5]]
item:map[l:map[V:3 tt:7] r:map[V:7 tt:5]]

Mathematical和Aggregate算子

Average

计算Observable中事件的平均值,示例代码如下:

1
observable := rxgo.Just(1, 2, 3, 4)().AverageInt() // 输出 2

实际使用中,由于Observable都是Hot Observable,所以Average类的算子(Count,Max,Min,Sum)往往结合Group算子和Window算子结合使用。
将上面Group的算子中的range groupedObservable.Observe()代码改成groupedObservable.AverageInt().Observe(),就是一个具体的Average算子。

Reduce

ReduceObservable中的每个事件apply一个function,并将其最后一个事件发送出去,与Scan算子的区别是Scan算子是将其所有事件发出,而Reduce只发出最后一个事件。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"context"
"fmt"
"github.com/reactivex/rxgo/v2"
)

func main() {

observable := rxgo.Just(1, 2, 3)().
Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) {
if acc == nil {
return elem, nil
}
return acc.(int) + elem.(int), nil
})

for item := range observable.Observe() {
fmt.Println(fmt.Sprintf("item:%v", item.V))
}
}

输出结果如下:

1
item:6

对比下Scan算子,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"context"
"fmt"
"github.com/reactivex/rxgo/v2"
)

func main() {

observable := rxgo.Just(1, 2, 3)().
Scan(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) {
if acc == nil {
return elem, nil
}
return acc.(int) + elem.(int), nil
})

for item := range observable.Observe() {
fmt.Println(fmt.Sprintf("item:%v", item.V))
}
}

输出结果如下:

1
2
3
item:1
item:3
item:6

对比之后发现Reduce算子只输出了6,而Scan算子输出了1、3、6。

Reduce算子也往往结合Group算子一起使用。