首页 > 安全资讯 >

RxJava 源码分析之 —— lift 变换

16-09-14

RxJava 源码分析 lift 变换

写在前面


rxjava 一直很火,我也用了一段时间,感觉特别好用。它属于响应式编程(Reactive Programming,以下简称 RP),脱胎于观察者模式。两者的对比如下:

观察者模式:observable -> observer 响应式编程:observable -> lift1 -> lift2 ->… ->observer

可以看到,RP 的特点是在观察的基础上,加入了传播路径上的变换(lift)。这使得我们可以把数据变换逻辑提取出来,而不必全部杂糅在 observer 里。

关于 lift:我们这么叫是因为,在 rxjava 里面,所有的变换都由一个叫 lift() 的函数实现。具体而言,它可以是 map(), flatMap(), filter(), groupBy() 等等任何变换。

前戏也做足了,有请我们的主角 lift。
以下内容假定你已经熟悉 rxjava 的基本使用。如不熟悉,可以先参看简单的介绍:RxJava 初探(网络请求)

 

一个简单的流程


下边要分析源码了,从简易的场景说起吧:
1. 创建一个 Observable,它包含了数据 “hello rxjava.”。
2. 然后订阅之,用 println() 打印出来。
我们按着数字 1~6 走一下流程:

    // 变量以 Str 结尾时为了提示该变量的范型是 String, 下同。。。
    private Observable.OnSubscribe mOnSubscribeStr = new Observable.OnSubscribe() {
        @Override
        public void call(Subscriber subscriber) { // 3: 接收来自 subscribe()的参数,即 mSubscriberStr
            subscriber.onNext("hello rxjava."); // 4: 生成数据 "hello rxjava.", 下发给订阅者 mSubscriberStr
        }
    };

    private Subscriber mSubscriberStr = new Subscriber() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(String string) { // 5: 接收来自 mOnSubscribeStr 的数据
            System.out.println(string); // 6: 打印
        }
    };

    private void testSubscribe(){
        Observable.create(mOnSubscribeStr) // 1: 传入 mOnSubscribeStr, 保存之, 相当于传入一个 Callback
                .subscribe(mSubscriberStr); // 2: 传入 mSubscriberStr , 由 OnSubscribe.call() 接收
    }

有几点要注意:
1. 第一步中,mOnSubscribeStr 会被 Observable 保存下来,具体请看源码。
2. 第二步中, subscribe() 里的 mSubscriberStr 参数最终会被传递到 3 处:OnSubscribe.call() 里面

所以,我们有理由相信,subscribe() 的实现基本是:mOnSubscribeStr.call( mSubscriberStr )
事实上确实如此,各种重载版本的 subscribe(), 最终辗转调用到 Observable.subscribe(subscriber, observable)。如图:

private static  Subscription subscribe(Subscriber subscriber, Observable observable) {
    ...
    try {
        // allow the hook to intercept and/or decorate
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); 
        // 看这行:这个 hook 基本**啥都不做**,返回一个 observable.onSubscribe,
        // 然后立马 .call(subscriber),和我们的猜想一致。

        // 至于这个奇怪的 hook,按源码说的,包裹一层 hook 是为了 intercept and/or decorate
        // 反正这个 hook 之后还会反复出现,传入啥就传出啥,直接忽视即可
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) { ... }
}

 

变换


到此为止,一个简单的流程走完了,一切都很完美。那么我们引入变换吧,先构造一个场景:
1. 创建一个 Observable,它包含了数据 “hello rxjava.”。
2. 做一个变换 string -> string.length()
3. 然后订阅之,用 println() 打印出来。

    Observable.OnSubscribe mOnSubscribeStr = new Observable.OnSubscribe() {
        @Override
        public void call(Subscriber subscriber) {
            subscriber.onNext("hello rxjava.");
        }
    };

    private Func1 mFuncStrToInt = new Func1() {
        @Override
        public Integer call(String string) {
            return string.length(); // 1: string -> string.length()
        }
    };

    private Subscriber mSubscriberInt = new Subscriber() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(Integer length) {
            System.out.println(length);
        }
    };

    private void testMap(){
        Observable.create(mOnSubscribeStr)
                .map(mFuncStrToInt) // 2: 把映射的 function 传进去,最后其实传给了 lift()
                .subscribe(mSubscriberInt);
    }

这次就不走流程了,大体上实在之前的流程上插入了一段 map()。我们来想想怎么实现这个 map 呢?

咱们列一下这么需求,然后试着实现一下 myMap():
Observable< Integer > myMap(Observable< String> observableStr, Func1< String, Integer > func);

map 调用之后需要改变范型(String -> Integer), 这意味着我们需要 new 一个 Observable< Integer > 出来。 new 了一个 ObservableInt 之后,我们的 subscribe 是针对这个新的 ObservableInt 的。我们必须与旧的 ObservableStr 建立起联系(让新的 ObservableInt 去订阅旧的 ObservableStr)。

实现如下,跟着流程 1~7 走一遍:

    // ---------- testMyMap ------------
    private static void testMyMap(){
        System.out.println("testMyMap: ");

        myMap(mObservableStr, mFuncStrToInt)    // 等价于 mObservableStr.map(mFuncStrToInt)
                .subscribe(mSubscriberInt); // 1: 传入 mSubscriberInt , 由 onSubscribeInt.call() 接收

        System.out.println("");
    }

    private static Observable myMap(final Observable observableStr
            , final Func1 func){

        Observable.OnSubscribe onSubscribeInt = new Observable.OnSubscribe() {
            @Override
            public void call(final Subscriber subscriberInt) { // 2: 接收 mSubscriberInt

                Subscriber subscriberStr = new Subscriber() {   // 3: 包裹 subscriberInt, 转换成 subscriberStr
                    @Override
                    public void onCompleted() {
                        subscriberInt.onCompleted();
                    }

                    @Override
                    public void onError(Throwable e) {
                        subscriberInt.onError(e);
                    }

                    @Override
                    public void onNext(String string) { // 5: 回溯到最上游 create 里, 再不断下发数据
                        Integer length = func.call(string); // 6: 变换
                        subscriberInt.onNext(length);   // 7: 下发新数据给下游
                    }
                };

                observableStr.subscribe(subscriberStr); // 4: 包裹了 mSubscriberInt 以后, 丢给上游 (即订阅了上游)
                // 等价于 mOnSubscribeStr.call(subscriberStr), 有点像责任链模式
            }
        };

        return Observable.create(onSubscribeInt);
    }
    // ---------- testMyMap ------------

看下输出效果:和官方的 map() 效果一致

testSubscribe: 
hello rxjava.

testMap: 
length = 13

testMyMap: 
length = 13

总结一下:
1. 每次变换会 new 出一个 Observable,它们串成一个数据流
2. 下游 create 的时候,会 subscribe 上游,上下游之间是一种 订阅 —— 发布 关系。

感觉是一种奇怪的责任链模式每个 Observable 只和前/后继(上/下游)是低耦合关系,与其他对象毫无关联。

 

下面来看官方 map 源码:

// map
public final  Observable map(Func1 func) {
    return lift(new OperatorMap(func)); // OperatorMap: 等于我们的步骤 5~7,变换、下发数据
}

// 先看 OperatorMap
// Operator接口: 基本是把 Subscriber 转换成 Subscriber
public interface Operator extends Func1, Subscriber> {
}

// 这里的  故意反着传进去,有点绕体会一下
public final class OperatorMap implements Operator {
    private final Func1 transformer;

    public OperatorMap(Func1 transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber call(final Subscriber o) {
        return new Subscriber(o) { // 等于我们的步骤 3: 包裹 subscriberInt, 转换成 subscriberStr
            @Override
            public void onCompleted() { o.onCompleted(); }

            @Override
            public void onError(Throwable e) { o.onError(e); }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t)); // 等于我们的步骤 5~7,变换、下发数据
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }
        };
    }
}

// 再看 lift
public final  Observable lift(final Operator operator) {
    return new Observable(new OnSubscribe() {
        @Override
        public void call(Subscriber o) { // 等于我们的步骤 2: 参数 o 接收 mSubscriberInt
            try {
                Subscriber st = hook.onLift(operator).call(o); // 等于我们的步骤 3:
                try {
                    ... 
                    this.onSubscribe.call(st); // 等于我们的步骤 4: 订阅上游

                } catch (Throwable e) { ... }
            } catch (Throwable e) { ... }
        }
    });
}



盗了张图来,有关这个回溯下发的流程,生动又形象:(每行都是 new 出来的新的 Observable )

这里写图片描述
 

参考


RxJava基本流程和lift源码分析
快速理解RxJava源码的设计理念
 

源码


有个 main() 函数,直接能 run 起来,请放心食用。

package com.example.jinliangshan.littlezhihu.home.rxjava;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

/**
 * Created by jinliangshan on 16/9/13.
 * 注意变量命名, 成员变量带前缀 m, 局部变量不带
 */
public class RxjavaTest {

    public static void main(String args[]){
        testSubscribe();
//        testMySubscribe();
        testMap();
        testMyMap();
    }

    // ---------- test ------------
    private static Observable.OnSubscribe mOnSubscribeStr = new Observable.OnSubscribe() {
        @Override
        public void call(Subscriber subscriber) {
            subscriber.onNext("hello rxjava.");
        }
    };

    private static Observable mObservableStr = Observable.create(mOnSubscribeStr);  // 数据源

    private static Subscriber mSubscriberStr = new Subscriber() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(String string) {
            System.out.println(string);
        }
    };

    private static void testSubscribe(){
        System.out.println("testSubscribe: ");

        mObservableStr
                .subscribe(mSubscriberStr);

        System.out.println("");
    }
    // ---------- test ------------

    // ---------- testMySubscribe ------------
    private static void testMySubscribe(){
        System.out.println("testMySubscribe: ");

        mySubscribe(mObservableStr, mSubscriberStr);

        System.out.println("");
    }

    private static void mySubscribe(Observable observableStr, Subscriber subscriberStr){
        mOnSubscribeStr.call(subscriberStr);
    }
    // ---------- testMySubscribe ------------

    // ---------- testMap ------------
    private static Func1 mFuncStrToInt = new Func1() {
        @Override
        public Integer call(String string) {
            return string.length();
        }
    };

    private static Subscriber mSubscriberInt = new Subscriber() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(Integer length) {
            System.out.println("length = " + length);
        }
    };

    private static void testMap(){
        System.out.println("testMap: ");

        mObservableStr
                .map(mFuncStrToInt)
                .subscribe(mSubscriberInt);

        System.out.println("");
    }
    // ---------- testMap ------------

    // ---------- testMyMap ------------
    private static void testMyMap(){
        System.out.println("testMyMap: ");

        myMap(mObservableStr, mFuncStrToInt)    // 等价于 mObservableStr.map(mFuncStrToInt)
                .subscribe(mSubscriberInt); // 1: 传入 mSubscriberInt , 由 onSubscribeInt.call() 接收

        System.out.println("");
    }

    private static Observable myMap(final Observable observableStr, final Func1 func){

        Observable.OnSubscribe onSubscribeInt = new Observable.OnSubscribe() {
            @Override
            public void call(final Subscriber subscriberInt) { // 2: 接收 mSubscriberInt

                Subscriber subscriberStr = new Subscriber() {   // 3: 包裹 subscriberInt, 转换成 subscriberStr
                    @Override
                    public void onCompleted() {
                        subscriberInt.onCompleted();
                    }

                    @Override
                    public void onError(Throwable e) {
                        subscriberInt.onError(e);
                    }

                    @Override
                    public void onNext(String string) { // 5: 回溯到最上游 create 里, 再不断下发数据
                        Integer length = func.call(string); // 6: 变换
                        subscriberInt.onNext(length);   // 7: 下发新数据给下游
                    }
                };

                observableStr.subscribe(subscriberStr); // 4: 包裹了 mSubscriberInt 以后, 丢给上游 (即订阅了上游)
                // 等价于 mOnSubscribeStr.call(subscriberStr), 有点像责任链模式
            }
        };

        return Observable.create(onSubscribeInt);
    }
    // ---------- testMyMap ------------
}
相关文章
最新文章
热点推荐