假设我在 Observable 上有一堆转换:

operation() 
    .flatMap(toSomething()) 
    .map(toSomethingElse()) 
    .flatMap(toYetSomethingElse()) 
    .subscribeOn(Schedulers.newThread()) 
    .observeOn(AdroidSchedulers.mainThread()) 
    .subscribe(observer); 

除了最后一次调用 flatMap() 之外,所有这些操作都是同步的吗? ?还是所有操作都在我告诉它订阅的线程上运行?

请您参考如下方法:

我通过测试发现了这一点。以下测试通过(这意味着 Observable 上的排放都在同一个后台线程上):

    volatile long observableThreadId; 
 
    @Test 
    public void transformedObservables_shouldRunInSameThread() { 
 
        Observable.from(new String[]{"a", "b", "c"}) // 
            .flatMap(new Func1<String, Observable<Object>>() { 
                @Override public Observable<Object> call(String s) { 
                    observableThreadId = Thread.currentThread().getId(); 
                    return Observable.from((Object) s); 
                } 
            }) // 
            .map(new Func1<Object, String>() { 
                @Override public String call(Object o) { 
                    long id = Thread.currentThread().getId(); 
                    if (id != observableThreadId) { 
                        throw new RuntimeException("Thread ID mismatch"); 
                    } 
 
                    return (String) o; 
                } 
            }) // 
            .flatMap(new Func1<String, Observable<String>>() { 
                @Override public Observable<String> call(String s) { 
                    long id = Thread.currentThread().getId(); 
                    if (id != observableThreadId) { 
                        throw new RuntimeException("Thread ID mismatch"); 
                    } 
 
                    return Observable.from(s); 
                } 
            }) // 
            .subscribeOn(Schedulers.newThread()) // 
            .observeOn(Schedulers.currentThread()) // 
            .subscribe(new Observer<String>() { 
                @Override public void onCompleted() { 
                    assertThat(Thread.currentThread().getId()).isNotEqualTo(observableThreadId); 
                } 
 
                @Override public void onError(Throwable throwable) { 
 
                } 
 
                @Override public void onNext(String s) { 
 
                } 
            }); 
 
        System.out.println("blah"); 
    } 

================================
更新:

实际上可以在 Scheduler 上的 ReactiveX 文档中找到更好的答案。 :

By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

... the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!