日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RxSwift之深入解析map操作符的底层实现

發(fā)布時間:2024/5/21 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RxSwift之深入解析map操作符的底层实现 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、map 操作符的使用

  • map 操作符將源 Observable 的每個元素轉(zhuǎn)換一遍,然后返回含有轉(zhuǎn)換結(jié)果的 Observable:

  • 現(xiàn)有如下示例:
Observable<Int>.of(1,2,3,4,5,6).subscribe(onNext: { (val) inprint(val)}).disposed(by: disposeBag) // 執(zhí)行結(jié)果 123456
  • 在 map 操作:
Observable<Int>.of(1,2,3,4,5,6).map{$0+10}.subscribe(onNext: { (val) inprint(val)}).disposed(by: disposeBag) // 執(zhí)行結(jié)果 111213141516
  • 說明:
    • of 初始化序列,序列元素類型需保存一致;
    • map操作符,操作序列每個元素加 10 后作為新元素,構(gòu)成新的序列。
  • 那么,map 是如何給序列重新設(shè)置新值的呢?

二、map 源碼分析

① map 函數(shù)的定義

    • map 閉包就像加工的機(jī)器,設(shè)定好加工程序 $0+10 就會對 of 中的每一個元素加工產(chǎn)出新的零件,首先來看一下 map 源碼都處理了哪些業(yè)務(wù):
extension ObservableType {public func map<R>(_ transform: @escaping (E) throws -> R)-> Observable<R> {return self.asObservable().composeMap(transform)} }
  • 分析:
    • transform 逃逸閉包,轉(zhuǎn)換邏輯交給業(yè)務(wù)層;
    • asObservable() 保證協(xié)議的一致性。
  • 可以看到 map 函數(shù)是一個帶閉包參數(shù)的 ObservableType 的擴(kuò)展函數(shù),內(nèi)部調(diào)用了 composeMap 并傳入外部的閉包以便內(nèi)部調(diào)用。我們猜測,該處閉包會被保留在內(nèi)部,在訂閱時被使用,那么根據(jù)斷點繼續(xù)探索,看看外界的閉包最終會保留在何處。
  • composeMap 所在類,如下所示:
    • source 向 _map 函數(shù)傳入了 self 即為當(dāng)前的序列對象;
    • transform 追蹤的外部閉包。
public class Observable<Element> : ObservableType {// Type of elements in sequence.public typealias E = Element......internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {return _map(source: self, transform: transform)} }
  • 可以看到,ObservableType 的子類 Observable 實現(xiàn) composeMap 方法,返回 Observable 類型的對象,在內(nèi)部調(diào)用了 _map 方法:
internal func _map<Element, R>(source: Observable<Element>, transform: @escaping (Element) throws -> R) -> Observable<R> {return Map(source: source, transform: transform) }
  • 繼續(xù)向 Map 內(nèi)部傳入序列,及業(yè)務(wù)層閉包,一直強調(diào)序列和業(yè)務(wù)層閉包,主要由于結(jié)構(gòu)復(fù)雜,以免被遺忘,后續(xù)和訂閱難以被聯(lián)系在一起。

② Map 類

  • 查看 Map 類,如下:
    • Map 繼承自 Producer,而 Producer 繼承自 Observable,提供了連接序列和觀察者的方法對象 sink,及發(fā)送序列元素到觀察者,再返回到訂閱;
    • Map 中保留源序列及業(yè)務(wù)層閉包方法;
    • run 方法會在父類 Producer 類中方法調(diào)用,父類指針指向子類對象。
final private class Map<SourceType, ResultType>: Producer<ResultType> {typealias Transform = (SourceType) throws -> ResultTypeprivate let _source: Observable<SourceType>private let _transform: Transforminit(source: Observable<SourceType>, transform: @escaping Transform) {self._source = sourceself._transform = transform#if TRACE_RESOURCES_ = increment(&_numberOfMapOperators) #endif}override func composeMap<R>(_ selector: @escaping (ResultType) throws -> R) -> Observable<R> {let originalSelector = self._transformreturn Map<SourceType, R>(source: self._source, transform: { (s: SourceType) throws -> R inlet r: ResultType = try originalSelector(s)return try selector(r)})}override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)let subscription = self._source.subscribe(sink)return (sink: sink, subscription: subscription)} }

③ 訂閱

  • 繼續(xù)斷點運行就到達(dá)訂閱,該處方法與 RxSwift 之深入解析核心邏輯 Observable 的底層原理中的訂閱方法為同一方法:
extension ObservableType {// 業(yè)務(wù)層訂閱調(diào)用public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)-> Disposable {let disposable: Disposableif let disposed = onDisposed {disposable = Disposables.create(with: disposed)}else {disposable = Disposables.create()}#if DEBUGlet synchronizationTracker = SynchronizationTracker()#endiflet callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []let observer = AnonymousObserver<E> { event in#if DEBUGsynchronizationTracker.register(synchronizationErrorMessage: .default)defer { synchronizationTracker.unregister() }#endifswitch event {case .next(let value):onNext?(value)case .error(let error):if let onError = onError {onError(error)}else {Hooks.defaultErrorHandler(callStack, error)}disposable.dispose()case .completed:onCompleted?()disposable.dispose()}}return Disposables.create(self.asObservable().subscribe(observer),disposable)} }
  • self.asObservable().subscribe(observer) 此處調(diào)用的則是 Producer 中的 subscribe 方法,該處方法實現(xiàn)邏輯如下:
class Producer<Element> : Observable<Element> {override init() {super.init()}override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {if !CurrentThreadScheduler.isScheduleRequired {// The returned disposable needs to release all references once it was disposed.let disposer = SinkDisposer()let sinkAndSubscription = self.run(observer, cancel: disposer)disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)return disposer}else {return CurrentThreadScheduler.instance.schedule(()) { _ inlet disposer = SinkDisposer()let sinkAndSubscription = self.run(observer, cancel: disposer)disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)return disposer}}} }

④ run 方法

  • 繼續(xù)查看內(nèi)部 self.run 方法調(diào)用,它的繼承鏈與 RxSwift之深入解析核心邏輯Observable的底層原理 中的繼承鏈有所不同,它們的繼承鏈對比如下:
    • RxSwift 核心邏輯中的 Producer 的子類是 AnonymousObservable,run方法在此類實現(xiàn);
    • Map 源碼中 Producer 的子類是 Map,run 方法在該處被實現(xiàn)。
  • run 方法的實現(xiàn)如下:
    • MapSink 方法和 RxSwift 核心邏輯中的 AnnonymousObservableSink 類似;
    • self._source 此處為訂閱時保存的閉包;
    • .subscribe(sink)Producer 類的方法,傳入 sink 用來調(diào)用 sink 中的 on 方法。
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)let subscription = self._source.subscribe(sink)return (sink: sink, subscription: subscription) }
  • MapSink 中保留的是觀察者,Map 中保留的為可觀察序列 Observable,通過 Observable 來觸發(fā)觀察者的方法調(diào)用,subscribe 方法中調(diào)用的 sinkAndSubscription = self.run(observer, cancel: disposer):
final private class ObservableSequence<S: Sequence>: Producer<S.Iterator.Element> {fileprivate let _elements: Sfileprivate let _scheduler: ImmediateSchedulerTypeinit(elements: S, scheduler: ImmediateSchedulerType) {self._elements = elementsself._scheduler = scheduler}override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)let subscription = sink.run()return (sink: sink, subscription: subscription)} }
  • ObservableSequence 是繼承自 Producer 的方法,內(nèi)部創(chuàng)建了 ObservableSequenceSink 對象并傳入了當(dāng)前 Observable 對象和 observer 對象,最后調(diào)用 run() 方法,此處內(nèi)部為變量序列并調(diào)用觀察者閉包方法,向外界發(fā)送消息。ObservableSequence 類繼承自 Sink,由此可知會調(diào)用 Sink 中的 forwardOn 方法,實現(xiàn)如下:
final private class ObservableSequenceSink<S: Sequence, O: ObserverType>: Sink<O> where S.Iterator.Element == O.E {typealias Parent = ObservableSequence<S>private let _parent: Parentinit(parent: Parent, observer: O, cancel: Cancelable) {self._parent = parentsuper.init(observer: observer, cancel: cancel)}func run() -> Disposable {return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse invar mutableIterator = iteratorif let next = mutableIterator.next() {self.forwardOn(.next(next))recurse(mutableIterator)}else {self.forwardOn(.completed)self.dispose()}}} }
  • _elements 是由 of 創(chuàng)建時保留的序列集合,此處對序列元素進(jìn)行遍歷,并調(diào)用 forwardOn 方法發(fā)送元素。forwardOn 的實現(xiàn)如下,_observer 是上面?zhèn)魅氲?MapSink 對象:
class Sink<O : ObserverType> : Disposable {fileprivate let _observer: Ofileprivate let _cancel: Cancelablefileprivate var _disposed = AtomicInt(0)#if DEBUGfileprivate let _synchronizationTracker = SynchronizationTracker()#endifinit(observer: O, cancel: Cancelable) { #if TRACE_RESOURCES_ = Resources.incrementTotal() #endifself._observer = observerself._cancel = cancel}final func forwardOn(_ event: Event<O.E>) {#if DEBUGself._synchronizationTracker.register(synchronizationErrorMessage: .default)defer { self._synchronizationTracker.unregister() }#endifif isFlagSet(&self._disposed, 1) {return}self._observer.on(event)} }
  • 可以看到,此處調(diào)用了 sink 的 on 方法,self._observer.on(event)。繼續(xù)追蹤 MapSink 類的 on 方法實現(xiàn):
final private class MapSink<SourceType, O: ObserverType>: Sink<O>, ObserverType {typealias Transform = (SourceType) throws -> ResultTypetypealias ResultType = O.Etypealias Element = SourceTypeprivate let _transform: Transforminit(transform: @escaping Transform, observer: O, cancel: Cancelable) {self._transform = transformsuper.init(observer: observer, cancel: cancel)}func on(_ event: Event<SourceType>) {switch event {case .next(let element):do {let mappedElement = try self._transform(element)self.forwardOn(.next(mappedElement))}catch let e {self.forwardOn(.error(e))self.dispose()}case .error(let error):self.forwardOn(.error(error))self.dispose()case .completed:self.forwardOn(.completed)self.dispose()}} }
  • 至此,就容易理解了,這里的 on 和 RxSwift 核心邏輯中的不同:
    • RxSwift 核心邏輯中此處由業(yè)務(wù)層 onNext 來觸發(fā);
    • Map 中是通過設(shè)定好的 of 序列直接觸發(fā)。
  • 元素處理:
    • let mappedElement = try self._transform(element) 調(diào)用外界閉包獲取新值;
    • self.forwardOn(.next(mappedElement)) 通過 forwardOn 將新值發(fā)送至訂閱者。
  • 最終會調(diào)用 ObserverBase 中的 on 方法,再調(diào)用觀察者 observer 的 onCore 方法,向觀察者發(fā)送元素。在由觀察者調(diào)用業(yè)務(wù)層訂閱時實現(xiàn)的閉包將序列元素發(fā)送到了業(yè)務(wù)層,到此 map 就完成了對源序列的修改。

三、總結(jié)

  • map 是對 sink 做了一層封裝,根據(jù)業(yè)務(wù)層的 map 設(shè)置在 ObservableSequenceSink 中處理了序列元素再發(fā)送至 forwardOn 直至 Observer 對象,由此完成了對元素的加工處理。
  • RxSwift 源碼比較繁瑣,復(fù)雜的邏輯帶來的是高效的開發(fā),高效的運行,因此對 RxSwfit 源碼還需要進(jìn)一步地理解和分析。

總結(jié)

以上是生活随笔為你收集整理的RxSwift之深入解析map操作符的底层实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。