RxSwift Subjects
什么是 Subject
Subject 同时表现为 observable 序列和 observer 订阅者,即它可以接收事件,在接收事件之后会将事件传递给它的订阅者
example(of: "PublishSubject") {
let subject = PublishSubject<String>()
subject.onNext("Is anyone listening?") // observer 的身份接收事件
// observable 的身份被订阅
let subscriptionOne = subject
.subscribe(onNext: { string in
print(string)
})
subject.on(.next("1"))
subject.onNext("2")
}
// 订阅者只会接收到在开始订阅之后 subject 接收的事件,所以 "Is anyone listening?" 不会打印
输出:
--- Example of: PublishSubject ---
1
2
RxSwift 中有四种 subject 类型和两种 relay 类型,relay 类型是对 subject 类型的封装
- PublishSubject: 以空白开始,只会将新的元素发送给订阅者
- BehaviorSubject: 以一个初始值开始,对新的订阅者会重发初始值或者最后一个元素
- ReplaySubject: 以一个缓冲大小开始,对新的订阅会重发初始值或者最后缓冲大小的元素
- AsyncSubject: 只在收到
.completed
事件时发送最后一个.next
事件,很少用到 - PublishRelay 和 BehaviorRelay: 它们各自封装了对应的 subject,但是只能接收
.next
事件,不能接收.completed
或者.error
事件,所以他们非常适合用在不会终止的序列上
使用 publish subject
在上面的例子中新增代码
let subscriptionTwo = subject
.subscribe { event in
print("2)", event.element ?? event)
}
subject.onNext("3")
输出:
3
2) 3
当一个订阅 dispose 之后将不会再接收新的事件
subscriptionOne.dispose()
subject.onNext("4")
输出:
2) 4
当一个 publish subject 接收了一个终止事件之后( .completed
或者 .error
),它将会重发该终止事件到新的订阅者并且不会再发出 .next
事件
新增代码
// 1
subject.onCompleted()
// 2
subject.onNext("5")
// 3
subscriptionTwo.dispose()
let disposeBag = DisposeBag()
// 4
subject
.subscribe {
print("3)", $0.element ?? $0)
}
.disposed(by: disposeBag)
subject.onNext("?")
输出:
2) completed
3) completed
最好在 publish subject 的订阅者中处理终止事件,以免在订阅的时候 publish subject 已经终止
使用 behavior subject
behavior subject 跟 publish subject 类似,但是会重放最后一个事件
// 1
enum MyError: Error {
case anError
}
// 2
func print<T: CustomStringConvertible>(label: String, event: Event<T>) {
print(label, (event.element ?? event.error) ?? event)
}
// 3
example(of: "BehaviorSubject") {
// 4
let subject = BehaviorSubject(value: "Initial value") // 因为要重放最后一个,所以必须有初始值
let disposeBag = DisposeBag()
subject
.subscribe {
print(label: "1)", event: $0)
}
.disposed(by: disposeBag)
}
输出:
--- Example of: BehaviorSubject ---
1) Initial value
将下面代码插入到 subject 的定义和订阅之间,输出会有变化
subject.onNext("X")
输出:
--- Example of: BehaviorSubject ---
1) X
在例子的最后添加代码
// 1
subject.onError(MyError.anError)
// 2
subject
.subscribe {
print(label: "2)", event: $0)
}
.disposed(by: disposeBag)
输出:
1) anError
2) anError
behavior subject 的一个使用场景是你想使用最近的数据展示在页面上,那么可以将页面的控件绑定到 behavior subject 上,在数据刷新的时候就会展示最近的结果
使用 replay subject
replay subject 和 behavior subject 类似,但是不是重放最后一个事件,而是重放最后的 n 个事件,称之为 buffer
buffer 会被保存在内存中,不合理的 buffer 可能会使用大量的内存,比如图片
如果 subject 的类型是数组,buffer 同样可能造成大量内存占用
example(of: "ReplaySubject") {
// 1
let subject = ReplaySubject<String>.create(bufferSize: 2)
let disposeBag = DisposeBag()
// 2
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
// 3
subject
.subscribe {
print(label: "1)", event: $0)
}
.disposed(by: disposeBag)
subject
.subscribe {
print(label: "2)", event: $0)
}
.disposed(by: disposeBag)
}
输出:
--- Example of: ReplaySubject ---
1) 2
1) 3
2) 2
2) 3
末尾新增代码
subject.onNext("4")
subject
.subscribe {
print(label: "3)", event: $0)
}
.disposed(by: disposeBag)
输出:
1) 4
2) 4
3) 3
3) 4
在 subject.onNext("4")
之后增加终止代码
subject.onNext("4")
subject.onError(MyError.anError)
subject
.subscribe {
print(label: "3)", event: $0)
}
.disposed(by: disposeBag)
输出:
1) 4
2) 4
1) anError
2) anError
3) 3
3) 4
3) anError
replay subject 即使收到了终止事件,依旧会保持 buffer,会对新的订阅者重放包括终止事件在内的 buffer
但是如果显式对 subject 调用 dispose,则新的订阅者只会收到 subject 已终止的信息
紧挨上面的 subject.onError(MyError.anError)
之后调用 dispose
subject.dispose()
subscribe 3 输出:
3) Object `RxSwift.ReplayMany<Swift.String>` was already disposed.
使用 relay
relay 的特点是只能新增 value 事件,不能新增 error 或者 completed 事件,所以对于 relay 可以使用 accept(_:)
方法而不是 connect(_:)
方法
PublishRelay 封装了 PublishSubject,BehaviorRelay 封装了 BehaviorSubject
PublishRelay
example(of: "PublishRelay") {
let relay = PublishRelay<String>()
let disposeBag = DisposeBag()
relay.accept("Knock knock, anyone home?")
relay
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
relay.accept("1")
}
输出:
--- Example of: PublishRelay ---
1
PublishRelay 是对 PublishSubject 的封装,所以没有重放
如果调用 relay.accept(MyError.anError) 或者 relay.onCompleted() 会触发编译错误
BehaviorRelay
example(of: "BehaviorRelay") {
// 1
let relay = BehaviorRelay(value: "Initial value")
let disposeBag = DisposeBag()
// 2
relay.accept("New initial value")
// 3
relay
.subscribe {
print(label: "1)", event: $0)
}
.disposed(by: disposeBag)
}
输出:
--- Example of: BehaviorRelay ---
1) New initial value
BehaviorRelay 是对 BehaviorSubject 的封装,所以会重放最后一个事件
在末尾添加新的事件和订阅
// 1
relay.accept("1")
// 2
relay
.subscribe {
print(label: "2)", event: $0)
}
.disposed(by: disposeBag)
// 3
relay.accept("2")
输出:
1) 1
2) 1 // subscrib 2 订阅之后的重放
1) 2
2) 2
可以通过 behavior relay 的 value 属性获取最近的值
print(relay.value)
输出:
2
留下评论