您好,欢迎访问三七文档
当前位置:首页 > 商业/管理/HR > 资本运营 > 观察者模式的极致,RxJava运行流程及源码简析
观察者模式的极致,RxJava运行流程及源码简析RxJava,ReactiveExtensionsfortheJVM。Github:生产者obObservable.onSubscribe---------------生产者的任务表onSubSubscriber--------------------------消费者subObservable.subscribe(Subscriber);------订阅1,最基本的流程(以下省略适配、优化等细节代码)observableAob=observable.create(newonSubscribe(){@Overridepublicvoidcall(Subscriber?superStringsubscriber){.......}});//Aob创建完毕,持有一个onSubscribe对象,此时是“冷”的,只是一个observable实例,Aob.subscribe(outSubscriber){...........outSubscriber.onStart();onSubscribe.call(outSubscriber);........};//调用Aob.subscribe(outSubscriber);方法,打开observable的调用链,//内部调用的是Aob.onSubscribe.call(outSubscriber)://将outSubscriber传入onSubscribe.Call(outSubscriber),相当于启动onSubscribe,onSubscribe将数据传递给outSubscriber。容我画个草图:(1)observableAob=observable.create(AonSub);———————————|Aob||持有||AonSub|———————————(2)Aob.subscribe(outSubscriber)—————————————————————————————————|Aob||调用||AonSub.Call(outSubscriber)|—————————————————————————————————(3)AonSub.Call(outSubscriber)———————————————————————————————————————|result为处理结果||outSubscriber.onNext(result)||outSubscriber.onCompleted()||outSubscriber.onError(Throwablee)|———————————————————————————————————————.2,map、flatmap、lift的调用流程(以下省略适配、优化等细节代码)//创建observableobservableAob=observable......//调用Aob.flatMapAob.flatMap(newFunc1String,ObservableString(){@OverridepublicObservableStringcall(Strings){.....}}).subscribe(outSubscriber);//绑定使用上,调用flatMap操作符后返回observable,紧接着绑定Subscriber,flatMap返回的observable还是Aob?一开始我是这么认为的。//flatMap()的返回ObservablepublicfinalObservableflatMap(Func1func){....//merge(Observable)操作符将多个Observable合为一,并异步、无序的发射Observable生产的数据,先不管它。//merge参数为Observable,那map(func)肯定返回Observable。returnmerge(map(func));}//map()也返回ObservablepublicfinalObservablemap(Func1func){//越调越远,lift()同样返回一个Observable,OperatorMap是?returnlift(newOperatorMapT,R(func));}到这先停一下,看看flatMap流程(先忽略merge,涉及到flatMap特性):flatMap(func)--map(func)--lift(OperatorMap(func));如果用map()操作符,那流程就是:map(func)--lift(OperatorMap(func));核心就在lift与参数OperatorMap对象中。//OperatorMap实现了Operator,Operator是?classOperatorMapT,RimplementsOperatorR,T;//Operator继承自Func1,并设定Func1的Call方法参数为Subscriber,返回值也是SubscriberinterfaceOperatorR,TextendsFunc1Subscriber?superR,Subscriber?superTOperatorMap实现了Operator,只重写一个Func1.Call方法.先大概看一下OperatorMap是什么鬼publicfinalclassOperatorMapT,RimplementsOperatorR,T{//在调用flatMap传入的Func1是用来表示某种规则和逻辑,用来过滤或“先”处理,最后会传到这里finalFunc1?superT,?extendsRtransformer;//构造方法传入Func1,就是flatMap传入的Func1publicOperatorMap(Func1?superT,?extendsRtransformer){//赋值成员变量this.transformer=transformer;}@Override//实现自Operator,Operator又继承自Func1,call方法参数为Subscriber,返回Subscriber。publicSubscriber?superTcall(finalSubscriber?superRo){//new一个Subscriber并返回returnnewSubscriberT(o){....};}}到这,知道了OperatorMap实现了Operator,重写Func1.Call方法,并持有了flatMap(Func1)方法中传入的Func1。至于call方法中传入的Subscriber与返回的Subscriber干嘛用的,不急,先看lift(Operator)。//参数Operator,map()中传入的是OperatorMap,返回ObservablepublicfinalRObservableRlift(finalOperator?extendsR,?superToperator){//直接newObservable并返回,这个构建方法是protected的returnnewObservableR(//Observable少不了OnSubnewOnSubscribeR(){@Overridepublicvoidcall(Subscriber?superRo){......}});}依照之前的调用流程,一路飙到lift(Operator)。flatMap(func)--map(func)--lift(OperatorMap(func));lift(Operator)返回了一个newObservable,则返回流程是:returnnewObservable--map(func)--flatMap(func)--前台;所以:Aob.flatMap(...).subscribe(outSubscriber);等价于ObservableBob=Aob.flatMap(...);Bob.subscribe(outSubscriber);在最基本的运行流程中,ob.subscribe(outSubscriber)会激活OnSubscriber,将outSubscriber传入OnSubscriber的call方法中,并经处理后的数据通过outSubscriber.onNext(result)等方法通知前台。下面我用Aob与Bob解释lift(),看看lift()返回的Observable.onSubscriber.call都做了什么//只留核心代码//参数Operator,map()中传入OperatorMap,返回ObservablepublicObservablelift(operator){//直接newObservable并返回,这个构建方法是protected的returnnewObservableR(//Observable少不了OnSubnewOnSubscribeR(){//参数o就是调用Bob.subscribe(outSubscriber)传入的outSubscriberpublicvoidcall(Subscriber?superRo){.......//hook.onLift(operator)方法直接将operator返回,真的就一行代码。//operator就是map()中传入的OperatorMap。//OperatorMap.call()方法传入一个Subscriber,返回一个Subscriber;Subscriber?superTst=hook.onLift(operator).call(o);//将OperatorMap.call()返回的Subscriber传给Aob.onSubscribe.call();onSubscribe.call(st);.........}});}1,Bob.subscribe(outSubscriber);|2,Bob.OnSubscribe.call(outSubscriber);|3,将outSubscriber传给OperatorMap.call(outSubscriber),返回另一个Subscriber,交给Aob.onSubscriber.call()处理。|4,没了WTF?没outSubscriber什么事了?Aob.onSubscriber处理完数据都交给另一个Subscriber处理了,outSubscriber干嘛去了?回头看看OperatorMap.call()方法publicfinalclassOperatorMapT,RimplementsOperatorR,T{...........@Override//outSubscriber传给OperatorMap.call(outSubscriber),参数o就是outSubscriberpublicSubscriber?superTcall(finalSubscriber?superRo){//new一个Subscriber并返回returnnewSubscriberT(o){//调用outSubscriber.onCompleted();@OverridepublicvoidonCompleted(){o.onCompleted();}//调用outSubscriber.onError(e);@OverridepublicvoidonError(Throwablee){o.onError(e);}@OverridepublicvoidonNext(Tt){try{//transformer就是.flatMap时传入的Func1//t是Aob.o
本文标题:观察者模式的极致,RxJava运行流程及源码简析
链接地址:https://www.777doc.com/doc-3721957 .html