RxSwift Subjects

28 分钟读完

什么是 Subject

Subject 同时表现为 observable 序列和 observer 订阅者,即它可以接收事件,在接收事件之后会将事件传递给它的订阅者

example(of: "PublishSubject") {
  let subject = PublishSubject<String>()
  subject.onNext("Is anyone listening?") // observer 的身份接收事件
  
  // observable 的身份被订阅
  let subscriptionOne = subject
  .subscribe(onNext: { string in
    print(string)
  })
  
  subject.on(.next("1"))
  subject.onNext("2")
}

// 订阅者只会接收到在开始订阅之后 subject 接收的事件,所以 "Is anyone listening?" 不会打印
输出
--- Example of: PublishSubject ---
1
2

RxSwift 中有四种 subject 类型和两种 relay 类型,relay 类型是对 subject 类型的封装

  • PublishSubject: 以空白开始,只会将新的元素发送给订阅者
  • BehaviorSubject: 以一个初始值开始,对新的订阅者会重发初始值或者最后一个元素
  • ReplaySubject: 以一个缓冲大小开始,对新的订阅会重发初始值或者最后缓冲大小的元素
  • AsyncSubject: 只在收到 .completed 事件时发送最后一个 .next 事件,很少用到
  • PublishRelayBehaviorRelay: 它们各自封装了对应的 subject,但是只能接收 .next 事件,不能接收 .completed 或者 .error 事件,所以他们非常适合用在不会终止的序列上

使用 publish subject

在上面的例子中新增代码

let subscriptionTwo = subject
  .subscribe { event in
    print("2)", event.element ?? event)
}

subject.onNext("3")

输出
3
2) 3

当一个订阅 dispose 之后将不会再接收新的事件

subscriptionOne.dispose()

subject.onNext("4")

输出
2) 4

当一个 publish subject 接收了一个终止事件之后( .completed 或者 .error ),它将会重发该终止事件到新的订阅者并且不会再发出 .next 事件

新增代码

// 1
subject.onCompleted()

// 2
subject.onNext("5")

// 3
subscriptionTwo.dispose()

let disposeBag = DisposeBag()

// 4
subject
  .subscribe {
    print("3)", $0.element ?? $0)
  }
  .disposed(by: disposeBag)

subject.onNext("?")

输出
2) completed
3) completed

最好在 publish subject 的订阅者中处理终止事件,以免在订阅的时候 publish subject 已经终止

使用 behavior subject

behavior subject 跟 publish subject 类似,但是会重放最后一个事件

// 1
enum MyError: Error {
  case anError
}

// 2
func print<T: CustomStringConvertible>(label: String, event: Event<T>) {
  print(label, (event.element ?? event.error) ?? event)
}

// 3
example(of: "BehaviorSubject") {
  // 4
  let subject = BehaviorSubject(value: "Initial value") // 因为要重放最后一个,所以必须有初始值
  let disposeBag = DisposeBag()
  
  subject
  	.subscribe {
    	print(label: "1)", event: $0)
  	}
  	.disposed(by: disposeBag)
}

输出
--- Example of: BehaviorSubject ---
1) Initial value

将下面代码插入到 subject 的定义和订阅之间,输出会有变化

subject.onNext("X")

输出
--- Example of: BehaviorSubject ---
1) X

在例子的最后添加代码

// 1
subject.onError(MyError.anError)

// 2
subject
  .subscribe {
    print(label: "2)", event: $0)
  }
  .disposed(by: disposeBag)

输出
1) anError
2) anError

behavior subject 的一个使用场景是你想使用最近的数据展示在页面上,那么可以将页面的控件绑定到 behavior subject 上,在数据刷新的时候就会展示最近的结果

使用 replay subject

replay subject 和 behavior subject 类似,但是不是重放最后一个事件,而是重放最后的 n 个事件,称之为 buffer

buffer 会被保存在内存中,不合理的 buffer 可能会使用大量的内存,比如图片

如果 subject 的类型是数组,buffer 同样可能造成大量内存占用

example(of: "ReplaySubject") {
  // 1
  let subject = ReplaySubject<String>.create(bufferSize: 2)
  let disposeBag = DisposeBag()

  // 2
  subject.onNext("1")
  subject.onNext("2")
  subject.onNext("3")

  // 3
  subject
    .subscribe {
      print(label: "1)", event: $0)
    }
    .disposed(by: disposeBag)

  subject
    .subscribe {
      print(label: "2)", event: $0)
    }
    .disposed(by: disposeBag)
}

输出
--- Example of: ReplaySubject ---
1) 2
1) 3
2) 2
2) 3

末尾新增代码

subject.onNext("4")

subject
  .subscribe {
    print(label: "3)", event: $0)
  }
  .disposed(by: disposeBag)

输出
1) 4
2) 4
3) 3
3) 4

subject.onNext("4") 之后增加终止代码

subject.onNext("4")
subject.onError(MyError.anError)

subject
  .subscribe {
    print(label: "3)", event: $0)
  }
  .disposed(by: disposeBag)

输出
1) 4
2) 4
1) anError
2) anError
3) 3
3) 4
3) anError

replay subject 即使收到了终止事件,依旧会保持 buffer,会对新的订阅者重放包括终止事件在内的 buffer

但是如果显式对 subject 调用 dispose,则新的订阅者只会收到 subject 已终止的信息

紧挨上面的 subject.onError(MyError.anError) 之后调用 dispose

subject.dispose()

subscribe 3 输出
3) Object `RxSwift.ReplayMany<Swift.String>` was already disposed.

使用 relay

relay 的特点是只能新增 value 事件,不能新增 error 或者 completed 事件,所以对于 relay 可以使用 accept(_:) 方法而不是 connect(_:) 方法

PublishRelay 封装了 PublishSubjectBehaviorRelay 封装了 BehaviorSubject

PublishRelay

example(of: "PublishRelay") {
  let relay = PublishRelay<String>()
  
  let disposeBag = DisposeBag()
  
  relay.accept("Knock knock, anyone home?")
  
  relay
  	.subscribe(onNext: {
    	print($0)
  	})
  	.disposed(by: disposeBag)

	relay.accept("1")
}

输出
--- Example of: PublishRelay ---
1

PublishRelay 是对 PublishSubject 的封装,所以没有重放

如果调用 relay.accept(MyError.anError) 或者 relay.onCompleted() 会触发编译错误

BehaviorRelay

example(of: "BehaviorRelay") {
  // 1
  let relay = BehaviorRelay(value: "Initial value")
  let disposeBag = DisposeBag()
  
  // 2
  relay.accept("New initial value")
  
  // 3
  relay
    .subscribe {
      print(label: "1)", event: $0)
    }
    .disposed(by: disposeBag)
}

输出:
--- Example of: BehaviorRelay ---
1) New initial value

BehaviorRelay 是对 BehaviorSubject 的封装,所以会重放最后一个事件

在末尾添加新的事件和订阅

// 1
relay.accept("1")

// 2
relay
  .subscribe {
    print(label: "2)", event: $0)
  }
  .disposed(by: disposeBag)

// 3
relay.accept("2")

输出
1) 1
2) 1 // subscrib 2 订阅之后的重放
1) 2
2) 2

可以通过 behavior relay 的 value 属性获取最近的值

print(relay.value)

输出
2

留下评论