ReactiveX简称为Rx,是基于事件流的一个异步编程API,其中事件流被称为Observable,事件流需要被Observer订阅才有意义。
Rx有多种编程语言实现,RxJava/RxJS/Rx.NET/RxClojure/RxSwift。其中RxGo是Rx的Go语言实现。
下面就来介绍下RxGo中常用的算子使用样例。
Observable & Observe
RxGO中事件流被抽象成Observable
,Observable
中的事件需要被订阅之后才能被消费,使用Observe
进行订阅。
下面是段创建事件流和订阅的代码段:
1 2 3 4
| observable := rxgo.Just("Hello, World!")() ch := observable.Observe() item := <-ch fmt.Println(item.V)
|
Buffer
根据某个规则将Observable
中的事件缓存下来然后一起发送出去。这个规则可以是根据事件的条数(BufferWithCount
)或者根据时间间隔(BufferWithTime
),当然也可以根据时间间隔和事件条数(BufferWithTimeOrCount
)。
下面的样例是根据事件条数的缓存规则进行发送事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package main
import ( "fmt" "github.com/reactivex/rxgo/v2" "time" )
func main() { ch := make(chan rxgo.Item, 1) go func() { i := 0 for range time.Tick(time.Second) { ch <- rxgo.Of(i) i++ } }()
observable := rxgo.FromChannel(ch).BufferWithCount(3)
for item := range observable.Observe() { fmt.Println(item.V) } }
|
每秒向channel中发送一个数字,Observable
缓存3个,然后发出,输出结果如下:
1 2 3 4
| [0 1 2] [3 4 5] [6 7 8] [9 10 11]
|
观察输出的内容可以看出item.V
是一个[]interface{}
,如果不使用Buffer
算子的话,item.V
是一个interface{}
,所以Buffer
算子其后再接其他算子的时候需要注意其事件是一个Slice
,而不再是一个普通的interface{}
FlatMap
FlatMap
是将Observable
中的事件解析成一个Observable
,代码示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package main
import ( "fmt" "github.com/reactivex/rxgo/v2" "strings" )
func main() { observable := rxgo.Just("Hello, World!", "World", "Golang World")().FlatMap(func(item rxgo.Item) rxgo.Observable { obs := rxgo.Just(strings.Split(item.V.(string), " "))() return obs })
for item := range observable.Observe() { fmt.Println(item.V) } }
|
FlatMap
将每个事件根据" "
进行拆分成一个slice
,并将其封装成一个Observable
发送出去。其输出内容如下:
1 2 3 4 5
| Hello, World! World Golang World
|
Map
Map
是将Observable
中的一个事件封装成另一个事件,发送出去。使用方式较为简单,其示例代码如下:
1 2 3 4
| observable := rxgo.Just(1, 2, 3)(). Map(func(_ context.Context, i interface{}) (interface{}, error) { return i.(int) * 10, nil })
|
输出如下内容:
Scan
对Observable
中的一个事件apply一个function,并将其处理的结果发送出去,其使用方式类似Reduce
算子,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package main
import ( "context" "fmt" "github.com/reactivex/rxgo/v2" )
func main() {
observable := rxgo.Just("a", "b", "c", "d")(). Scan(func(ctx context.Context, i interface{}, i2 interface{}) (interface{}, error) { if i == nil { return i2, nil } return fmt.Sprintf("%s,%s", i.(string), i2.(string)), nil })
for item := range observable.Observe() { fmt.Println(item.V) } }
|
示例是将目前发送过的事件进行拼接,然后进行发送。其输出结果如下:
GroupByDynamic
根据规则将Observable
中的事件进行分组(分组个数没有限制),然后将其分组结果封装成GroupedObservables
发送出去。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package main
import ( "fmt" "github.com/reactivex/rxgo/v2" "strconv" "time" )
func main() {
observable := rxgo.Range(0, 24).GroupByDynamic(func(item rxgo.Item) string { return strconv.Itoa(item.V.(int) % 2) }, rxgo.WithBufferedChannel(12))
for i := range observable.Observe() { groupedObservable := i.V.(rxgo.GroupedObservable) key := groupedObservable.Key fmt.Printf("New observable: %s\n", key)
for i := range groupedObservable.Observe() { fmt.Printf("key: %s, item: %v\n", key, i.V) } } }
|
GroupByDynamic
中的func
定义了分组的规则strconv.Itoa(item.V.(int) % 2)
,也就是说分组的key就是strconv.Itoa(item.V.(int) % 2)
,Observe
之后从channel中接收GroupedObservable
,然后再进行数据获取。其输出结果如下:
1 2 3 4 5 6 7 8
| New observable: 0 key: 0, item: 0 ... key: 0, item: 22 New observable: 1 key: 1, item: 1 ... key: 1, item: 23
|
不过这里有个buff需要注意下,GroupByDynamic
中传入了rxgo.WithBufferedChannel(12)
,这个buff是用来做什么的呢?长度又是怎么定义的呢?
通过源码可得知这个buff是一个channel,是用来存储分组中的元素的。在上面的例子中如果buff的长度设置的小于12,其程序会被block住。之所以会被block住是因为当前程序从Observe
中订阅数据是串行的,只消费了0号分组中buff中的数据,而1号分组中buff的数据并没有消费,其channel已填满数据,无法再写入数据,所以程序被block住了。
所以这个长度之所以是12,是因为24个数,分2组,每组12个,buff的长度又正好是12个,串行消费数据并不会造成channel阻塞。
但是实际使用过程中并不能准备评估每组的个数,那这个buff又该怎么设置呢?
解决方法是协程来订阅GroupedObservable
,调整后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package main
import ( "fmt" "github.com/reactivex/rxgo/v2" "time" )
func main() {
observable := rxgo.Range(0, 24).GroupByDynamic(func(item rxgo.Item) string { if item.V.(int) <= 5 { return "1" } else if item.V.(int) <= 11 { return "2" } else if item.V.(int) <= 17 { return "1" } else if item.V.(int) <= 23 { return "2" } else { return "3" } }, rxgo.WithBufferedChannel(2))
for i := range observable.Observe() { go func() { groupedObservable := i.V.(rxgo.GroupedObservable) key := groupedObservable.Key fmt.Printf("New observable: %s\n", key)
for i := range groupedObservable.Observe() { fmt.Printf("key: %s, item: %v\n", key, i.V) } }()
}
time.Sleep(1 * time.Hour) }
|
使用协程来订阅GroupedObservable
,每个分组中的数据都能被及时消费掉,并不会造成buff阻塞,所以这里buff只是设置成rxgo.WithBufferedChannel(2)
。
Window
Window
算子较为简单,根据事件个数或者时间间隔将窗口中的事件发送到一个Observable
中。其中Window
算子与Buffer
算子效果上类似,只是Window
是将窗口中的事件封装成一个Observable
,而Buffer
算子是将缓存中的事件当成一个新的事件发出。
Window
算子示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package main
import ( "fmt" "github.com/reactivex/rxgo/v2" )
func main() { observe := rxgo.Just(1, 2, 3, 4, 5)().WindowWithCount(2).Observe()
for item := range observe { fmt.Println("new obs...") for i := range item.V.(rxgo.Observable).Observe() { fmt.Println(i.V) } } }
|
输出的结果如下:
1 2 3 4 5 6 7 8
| new obs... 1 2 new obs... 3 4 new obs... 5
|
Filtering算子
Filter
Filter
算子用于过滤符合条件的事件,使用方式较为简单,其代码如下:
1 2 3 4
| observable := rxgo.Just(1, 2, 3)(). Filter(func(i interface{}) bool { return i != 2 })
|
Combining算子
Join
Join
是将两个Observable
中规定时间间隔中的事件进行笛卡尔积处理,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package main
import ( "context" "fmt" "github.com/reactivex/rxgo/v2" "time" )
func main() {
observable := rxgo.Just( map[string]int64{"tt": 1, "V": 1}, map[string]int64{"tt": 4, "V": 2}, map[string]int64{"tt": 7, "V": 3}, )().Join(func(ctx context.Context, l interface{}, r interface{}) (interface{}, error) { return map[string]interface{}{ "l": l, "r": r, }, nil }, rxgo.Just( map[string]int64{"tt": 2, "V": 5}, map[string]int64{"tt": 3, "V": 6}, map[string]int64{"tt": 5, "V": 7}, )(), func(i interface{}) time.Time { return time.Unix(0, i.(map[string]int64)["tt"]*1000000) }, rxgo.WithDuration(2*time.Millisecond))
for item := range observable.Observe() { fmt.Println(fmt.Sprintf("item:%v", item.V)) } }
|
Join
算子是将固定时间间隔中的事件进行处理,这个间隔是由rxgo.WithDuration(2*time.Millisecond)
控制的,时间获取方式是又第三个参数func(i interface{}) time.Time
方法决定的。
两条流中的事件是否进行join,是事件根据func(i interface{}) time.Time
获取时间,两者时间的差值是否在rxgo.WithDuration(2*time.Millisecond)
中,如果再则join。
上例中左流中的"tt": 1
与右流中的"tt": 2
和"tt": 3
进行join,不与"tt": 5
进行join是因为5-1=4>rxgo.WithDuration(2*time.Millisecond)
。
其输出结果如下:
1 2 3 4 5 6
| item:map[l:map[V:1 tt:1] r:map[V:5 tt:2]] item:map[l:map[V:1 tt:1] r:map[V:6 tt:3]] item:map[l:map[V:2 tt:4] r:map[V:5 tt:2]] item:map[l:map[V:2 tt:4] r:map[V:6 tt:3]] item:map[l:map[V:2 tt:4] r:map[V:7 tt:5]] item:map[l:map[V:3 tt:7] r:map[V:7 tt:5]]
|
Mathematical和Aggregate算子
Average
计算Observable
中事件的平均值,示例代码如下:
1
| observable := rxgo.Just(1, 2, 3, 4)().AverageInt()
|
实际使用中,由于Observable
都是Hot Observable
,所以Average
类的算子(Count
,Max
,Min
,Sum
)往往结合Group
算子和Window
算子结合使用。
将上面Group
的算子中的range groupedObservable.Observe()
代码改成groupedObservable.AverageInt().Observe()
,就是一个具体的Average
算子。
Reduce
Reduce
对Observable
中的每个事件apply一个function,并将其最后一个事件发送出去,与Scan
算子的区别是Scan
算子是将其所有事件发出,而Reduce
只发出最后一个事件。示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package main
import ( "context" "fmt" "github.com/reactivex/rxgo/v2" )
func main() {
observable := rxgo.Just(1, 2, 3)(). Reduce(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { if acc == nil { return elem, nil } return acc.(int) + elem.(int), nil })
for item := range observable.Observe() { fmt.Println(fmt.Sprintf("item:%v", item.V)) } }
|
输出结果如下:
对比下Scan
算子,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package main
import ( "context" "fmt" "github.com/reactivex/rxgo/v2" )
func main() {
observable := rxgo.Just(1, 2, 3)(). Scan(func(_ context.Context, acc interface{}, elem interface{}) (interface{}, error) { if acc == nil { return elem, nil } return acc.(int) + elem.(int), nil })
for item := range observable.Observe() { fmt.Println(fmt.Sprintf("item:%v", item.V)) } }
|
输出结果如下:
对比之后发现Reduce
算子只输出了6,而Scan
算子输出了1、3、6。
Reduce
算子也往往结合Group
算子一起使用。